This repository has been archived by the owner on Jan 11, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
Row-based read support #68
Merged
Merged
Changes from 9 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
561d1ce
make small changes for read support
sadikovi 266f231
add record api
sadikovi ca5de1e
add bin read-file
sadikovi 6b5022d
update pattern match
sadikovi 0644d13
add test files for reader
sadikovi b9a63da
move file methods to test_common
sadikovi 8ddebd7
add tests for triplet
sadikovi 4294586
add tests for reader
sadikovi c0b0d5b
remove new line
sadikovi bc11250
rename bin files
sadikovi c28bdea
update num-records messages
sadikovi 598d840
minor changes, add compatibility link
sadikovi 2476ae3
update file api
sadikovi e6ec884
add test
sadikovi 91254ed
Merge remote-tracking branch 'origin/master' into read-support-final
sadikovi 8de8a5d
downgrade Rust nightly version for the build
sadikovi aefb3f4
downgrade rust nightly to nightly-2018-03-26 in travis
sadikovi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
extern crate parquet; | ||
|
||
use std::env; | ||
use std::fs::File; | ||
use std::path::Path; | ||
use std::process; | ||
|
||
use parquet::file::reader::{FileReader, SerializedFileReader}; | ||
|
||
fn main() { | ||
let args: Vec<String> = env::args().collect(); | ||
if args.len() != 2 && args.len() != 3 { | ||
println!("Usage: read-file <file-path> <num-records>"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, will change. |
||
process::exit(1); | ||
} | ||
|
||
let mut num_records: Option<usize> = None; | ||
if args.len() == 3 { | ||
match args[2].parse() { | ||
Ok(value) => num_records = Some(value), | ||
Err(e) => panic!("Error when reading value for <num-records>, {}", e) | ||
} | ||
} | ||
|
||
let path = Path::new(&args[1]); | ||
let file = File::open(&path).unwrap(); | ||
let parquet_reader = SerializedFileReader::new(file).unwrap(); | ||
|
||
// Use full schema as projected schema | ||
let mut iter = parquet_reader.get_row_iter(None).unwrap(); | ||
|
||
let mut start = 0; | ||
let end = num_records.unwrap_or(0); | ||
let all_records = num_records.is_none(); | ||
|
||
while all_records || start < end { | ||
match iter.next() { | ||
Some(row) => println!("{}", row), | ||
None => break, | ||
} | ||
start += 1; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,10 +26,11 @@ use byteorder::{LittleEndian, ByteOrder}; | |
use thrift::protocol::TCompactInputProtocol; | ||
use parquet_thrift::parquet::FileMetaData as TFileMetaData; | ||
use parquet_thrift::parquet::{PageType, PageHeader}; | ||
use schema::types::{self, SchemaDescriptor}; | ||
use schema::types::{self, Type as SchemaType, SchemaDescriptor}; | ||
use column::page::{Page, PageReader}; | ||
use column::reader::{ColumnReader, ColumnReaderImpl}; | ||
use compression::{Codec, create_codec}; | ||
use record::reader::{FileRowIter, RowIter, TreeBuilder}; | ||
use util::io::FileChunk; | ||
use util::memory::ByteBufferPtr; | ||
|
||
|
@@ -50,6 +51,11 @@ pub trait FileReader { | |
/// the same as this. Otherwise, the row group metadata stored in the row group reader | ||
/// may outlive the file reader. | ||
fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader>>; | ||
|
||
/// Get full iterator of `Row` from a file (over all row groups). | ||
/// Projected schema can be a subset of or equal to the file schema, when it is None, | ||
/// full file schema is assumed. | ||
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<FileRowIter>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use a single type of iterator for both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we should be able to, will change. |
||
} | ||
|
||
/// Parquet row group reader API. With this, user can get metadata information about the | ||
|
@@ -66,6 +72,11 @@ pub trait RowGroupReader { | |
|
||
/// Get value reader for the `i`th column chunk | ||
fn get_column_reader(&self, i: usize) -> Result<ColumnReader>; | ||
|
||
/// Get row iterator for this row group, using schema descriptor. | ||
/// Schema descriptor must be a valid subset of the file schema or be an original | ||
/// descriptor, see `FileReader::get_row_iter` for more information. | ||
fn get_row_iter(&self, proj_descr: Rc<SchemaDescriptor>) -> RowIter; | ||
} | ||
|
||
|
||
|
@@ -174,6 +185,27 @@ impl FileReader for SerializedFileReader { | |
let f = self.buf.get_ref().try_clone()?; | ||
Ok(Box::new(SerializedRowGroupReader::new(f, row_group_metadata))) | ||
} | ||
|
||
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<FileRowIter> { | ||
let file_metadata = self.metadata().file_metadata(); | ||
|
||
// Resolve projected schema (either full file schema or a subset) | ||
let proj_descr = match projection { | ||
Some(projection) => { | ||
// check if projection is part of file schema | ||
let root_schema = file_metadata.schema_descr().root_schema(); | ||
if !root_schema.check_contains(&projection) { | ||
return Err(general_err!("Root schema does not contain projection")); | ||
} | ||
Rc::new(SchemaDescriptor::new(Rc::new(projection))) | ||
}, | ||
None => { | ||
file_metadata.schema_descr_ptr() | ||
} | ||
}; | ||
|
||
Ok(FileRowIter::new(proj_descr, self)) | ||
} | ||
} | ||
|
||
/// A serialized impl for row group reader | ||
|
@@ -183,7 +215,7 @@ pub struct SerializedRowGroupReader { | |
} | ||
|
||
impl SerializedRowGroupReader { | ||
pub fn new(file: File, metadata: RowGroupMetaDataPtr ) -> Self { | ||
pub fn new(file: File, metadata: RowGroupMetaDataPtr) -> Self { | ||
let buf = BufReader::new(file); | ||
Self { buf, metadata } | ||
} | ||
|
@@ -237,6 +269,11 @@ impl RowGroupReader for SerializedRowGroupReader { | |
}; | ||
Ok(col_reader) | ||
} | ||
|
||
fn get_row_iter(&self, proj_descr: Rc<SchemaDescriptor>) -> RowIter { | ||
// Build new tree of readers and return it as a row iterator | ||
TreeBuilder::new().as_row_iter(proj_descr, self) | ||
} | ||
} | ||
|
||
|
||
|
@@ -382,9 +419,7 @@ impl PageReader for SerializedPageReader { | |
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use std::fs; | ||
use std::env; | ||
use std::io::Write; | ||
use util::test_common::{get_temp_file, get_test_file}; | ||
|
||
#[test] | ||
fn test_file_reader_metadata_size_smaller_than_footer() { | ||
|
@@ -577,33 +612,4 @@ mod tests { | |
} | ||
assert_eq!(page_count, 2); | ||
} | ||
|
||
fn get_test_file(file_name: &str) -> fs::File { | ||
let mut path_buf = env::current_dir().unwrap(); | ||
path_buf.push("data"); | ||
path_buf.push(file_name); | ||
let file = File::open(path_buf.as_path()); | ||
assert!(file.is_ok()); | ||
file.unwrap() | ||
} | ||
|
||
fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File { | ||
// build tmp path to a file in "target/debug/testdata" | ||
let mut path_buf = env::current_dir().unwrap(); | ||
path_buf.push("target"); | ||
path_buf.push("debug"); | ||
path_buf.push("testdata"); | ||
fs::create_dir_all(&path_buf).unwrap(); | ||
path_buf.push(file_name); | ||
|
||
// write file content | ||
let mut tmp_file = File::create(path_buf.as_path()).unwrap(); | ||
tmp_file.write_all(content).unwrap(); | ||
tmp_file.sync_all().unwrap(); | ||
|
||
// read file and return file handle | ||
let file = File::open(path_buf.as_path()); | ||
assert!(file.is_ok()); | ||
file.unwrap() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the file name
read-file
is still not specific enough - after people docargo install
. Perhaps we should add prefix such asparquet
or something to differentiate these executables (same fordump-schema
)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can rename them to
parquet-read
andparquet-schema
. Will that work?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that should be better 👍