Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for level histograms added in PARQUET-2261 to ParquetMetaData #6105

Merged
merged 27 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
741bbf6
bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
BugenZhao Jul 16, 2024
8f76248
Remove `impl<T: AsRef<[u8]>> From<T> for Buffer` that easily acciden…
XiangpengHao Jul 16, 2024
bb5f12b
Make display of interval types more pretty (#6006)
Rachelint Jul 16, 2024
756b1fb
Update snafu (#5930)
Jesse-Bakker Jul 16, 2024
fe04e09
Update Parquet thrift generated structures (#6045)
etseidl Jul 16, 2024
2e7f7ef
Revert "Revert "Write Bloom filters between row groups instead of the…
alamb Jul 16, 2024
effccc1
Revert "Update snafu (#5930)" (#6069)
alamb Jul 16, 2024
649d09d
Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
crepererum Jul 17, 2024
05e681d
remove repeated codes to make the codes more concise. (#6080)
Rachelint Jul 18, 2024
e40b311
Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
etseidl Jul 19, 2024
41b8f66
deprecate read_page_locations
etseidl Jul 19, 2024
c4c0a4d
add level histograms to metadata
etseidl Jul 19, 2024
1507c46
add to_thrift() to OffsetIndexMetaData
etseidl Jul 22, 2024
23cc77e
Merge branch 'deprecate_read_page_locations' into size_stats_histograms
etseidl Jul 22, 2024
81c34ac
Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
dependabot[bot] Jul 23, 2024
3bc9987
Deprecate read_page_locations() and simplify offset index in `Parquet…
etseidl Jul 23, 2024
8369436
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_hist…
etseidl Jul 23, 2024
bf4dadb
move valid test into ColumnIndexBuilder::append_histograms
etseidl Jul 25, 2024
4002464
move update_histogram() inside ColumnMetrics
etseidl Jul 25, 2024
095130f
Merge remote-tracking branch 'apache/master' into 53.0.0-dev
alamb Jul 25, 2024
a6353d1
Update parquet/src/column/writer/mod.rs
alamb Jul 25, 2024
f892613
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_hist…
etseidl Jul 25, 2024
978264d
Implement LevelHistograms as a struct
alamb Jul 26, 2024
642c7d4
Merge remote-tracking branch 'origin/master' into size_stats_histograms
etseidl Jul 26, 2024
9582e0d
Merge pull request #1 from alamb/alamb/level_histogram_structs
etseidl Jul 26, 2024
70af5dd
formatting
etseidl Jul 26, 2024
e774bbe
fix error in docs
etseidl Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,29 @@ impl Buffer {
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
/// allocated memory region.
impl<T: AsRef<[u8]>> From<T> for Buffer {
fn from(p: T) -> Self {
// allocate aligned memory buffer
let slice = p.as_ref();
let len = slice.len();
let mut buffer = MutableBuffer::new(len);
buffer.extend_from_slice(slice);
buffer.into()
/// Note that here we deliberately do not implement
/// `impl<T: AsRef<[u8]>> From<T> for Buffer`
/// As it would accept `Buffer::from(vec![...])` that would cause an unexpected copy.
/// Instead, we ask user to be explicit when copying is occurring, e.g., `Buffer::from(vec![...].to_byte_slice())`.
/// For zero-copy conversion, user should use `Buffer::from_vec(vec![...])`.
///
/// Since we removed impl for `AsRef<u8>`, we added the following three specific implementations to reduce API breakage.
/// See <https://github.com/apache/arrow-rs/issues/6033> for more discussion on this.
impl From<&[u8]> for Buffer {
fn from(p: &[u8]) -> Self {
Self::from_slice_ref(p)
}
}

impl<const N: usize> From<[u8; N]> for Buffer {
fn from(p: [u8; N]) -> Self {
Self::from_slice_ref(p)
}
}

impl<const N: usize> From<&[u8; N]> for Buffer {
fn from(p: &[u8; N]) -> Self {
Self::from_slice_ref(p)
}
}

Expand Down
26 changes: 13 additions & 13 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4314,8 +4314,8 @@ mod tests {
IntervalUnit::YearMonth,
IntervalYearMonthArray,
vec![
Some("1 years 1 mons 0 days 0 hours 0 mins 0.00 secs"),
Some("2 years 7 mons 0 days 0 hours 0 mins 0.00 secs"),
Some("1 years 1 mons"),
Some("2 years 7 mons"),
None,
None,
None,
Expand All @@ -4338,9 +4338,9 @@ mod tests {
IntervalUnit::DayTime,
IntervalDayTimeArray,
vec![
Some("0 years 0 mons 390 days 0 hours 0 mins 0.000 secs"),
Some("0 years 0 mons 930 days 0 hours 0 mins 0.000 secs"),
Some("0 years 0 mons 30 days 0 hours 0 mins 0.000 secs"),
Some("390 days"),
Some("930 days"),
Some("30 days"),
None,
None,
]
Expand All @@ -4366,16 +4366,16 @@ mod tests {
IntervalUnit::MonthDayNano,
IntervalMonthDayNanoArray,
vec![
Some("0 years 13 mons 1 days 0 hours 0 mins 0.000000000 secs"),
Some("13 mons 1 days"),
None,
Some("0 years 31 mons 35 days 0 hours 0 mins 0.001400000 secs"),
Some("0 years 0 mons 3 days 0 hours 0 mins 0.000000000 secs"),
Some("0 years 0 mons 0 days 0 hours 0 mins 8.000000000 secs"),
Some("31 mons 35 days 0.001400000 secs"),
Some("3 days"),
Some("8.000000000 secs"),
None,
Some("0 years 0 mons 1 days 0 hours 0 mins 29.800000000 secs"),
Some("0 years 3 mons 0 days 0 hours 0 mins 1.000000000 secs"),
Some("0 years 0 mons 0 days 0 hours 8 mins 0.000000000 secs"),
Some("0 years 63 mons 9 days 19 hours 9 mins 2.222000000 secs"),
Some("1 days 29.800000000 secs"),
Some("3 mons 1.000000000 secs"),
Some("8 mins"),
Some("63 mons 9 days 19 hours 9 mins 2.222000000 secs"),
None,
]
);
Expand Down
155 changes: 115 additions & 40 deletions arrow-cast/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,73 +654,148 @@ impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalYearMonthType> {
let years = (interval / 12_f64).floor();
let month = interval - (years * 12_f64);

write!(
f,
"{years} years {month} mons 0 days 0 hours 0 mins 0.00 secs",
)?;
write!(f, "{years} years {month} mons",)?;
Ok(())
}
}

impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalDayTimeType> {
fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult {
let value = self.value(idx);
let mut prefix = "";

let secs = value.milliseconds / 1_000;
if value.days != 0 {
write!(f, "{prefix}{} days", value.days)?;
prefix = " ";
}

if value.milliseconds != 0 {
let millis_fmt = MillisecondsFormatter {
milliseconds: value.milliseconds,
prefix,
};

f.write_fmt(format_args!("{millis_fmt}"))?;
}

Ok(())
}
}

impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalMonthDayNanoType> {
fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult {
let value = self.value(idx);
let mut prefix = "";

if value.months != 0 {
write!(f, "{prefix}{} mons", value.months)?;
prefix = " ";
}

if value.days != 0 {
write!(f, "{prefix}{} days", value.days)?;
prefix = " ";
}

if value.nanoseconds != 0 {
let nano_fmt = NanosecondsFormatter {
nanoseconds: value.nanoseconds,
prefix,
};
f.write_fmt(format_args!("{nano_fmt}"))?;
}

Ok(())
}
}

struct NanosecondsFormatter<'a> {
nanoseconds: i64,
prefix: &'a str,
}

impl<'a> Display for NanosecondsFormatter<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut prefix = self.prefix;

let secs = self.nanoseconds / 1_000_000_000;
let mins = secs / 60;
let hours = mins / 60;

let secs = secs - (mins * 60);
let mins = mins - (hours * 60);

let milliseconds = value.milliseconds % 1_000;
let nanoseconds = self.nanoseconds % 1_000_000_000;

let secs_sign = if secs < 0 || milliseconds < 0 {
"-"
} else {
""
};
if hours != 0 {
write!(f, "{prefix}{} hours", hours)?;
prefix = " ";
}

if mins != 0 {
write!(f, "{prefix}{} mins", mins)?;
prefix = " ";
}

if secs != 0 || nanoseconds != 0 {
let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };
write!(
f,
"{prefix}{}{}.{:09} secs",
secs_sign,
secs.abs(),
nanoseconds.abs()
)?;
}

write!(
f,
"0 years 0 mons {} days {} hours {} mins {}{}.{:03} secs",
value.days,
hours,
mins,
secs_sign,
secs.abs(),
milliseconds.abs(),
)?;
Ok(())
}
}

impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalMonthDayNanoType> {
fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult {
let value = self.value(idx);
struct MillisecondsFormatter<'a> {
milliseconds: i32,
prefix: &'a str,
}

impl<'a> Display for MillisecondsFormatter<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut prefix = self.prefix;

let secs = value.nanoseconds / 1_000_000_000;
let secs = self.milliseconds / 1_000;
let mins = secs / 60;
let hours = mins / 60;

let secs = secs - (mins * 60);
let mins = mins - (hours * 60);

let nanoseconds = value.nanoseconds % 1_000_000_000;

let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };

write!(
f,
"0 years {} mons {} days {} hours {} mins {}{}.{:09} secs",
value.months,
value.days,
hours,
mins,
secs_sign,
secs.abs(),
nanoseconds.abs(),
)?;
let milliseconds = self.milliseconds % 1_000;

if hours != 0 {
write!(f, "{prefix}{} hours", hours,)?;
prefix = " ";
}

if mins != 0 {
write!(f, "{prefix}{} mins", mins,)?;
prefix = " ";
}

if secs != 0 || milliseconds != 0 {
let secs_sign = if secs < 0 || milliseconds < 0 {
"-"
} else {
""
};

write!(
f,
"{prefix}{}{}.{:03} secs",
secs_sign,
secs.abs(),
milliseconds.abs()
)?;
}

Ok(())
}
}
Expand Down
54 changes: 27 additions & 27 deletions arrow-cast/src/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,16 +986,16 @@ mod tests {
let table = pretty_format_batches(&[batch]).unwrap().to_string();

let expected = vec![
"+----------------------------------------------------+",
"| IntervalDayTime |",
"+----------------------------------------------------+",
"| 0 years 0 mons -1 days 0 hours -10 mins 0.000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -1.001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -0.001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.010 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.100 secs |",
"+----------------------------------------------------+",
"+------------------+",
"| IntervalDayTime |",
"+------------------+",
"| -1 days -10 mins |",
"| -1.001 secs |",
"| -0.001 secs |",
"| 0.001 secs |",
"| 0.010 secs |",
"| 0.100 secs |",
"+------------------+",
];

let actual: Vec<&str> = table.lines().collect();
Expand Down Expand Up @@ -1032,23 +1032,23 @@ mod tests {
let table = pretty_format_batches(&[batch]).unwrap().to_string();

let expected = vec![
"+-----------------------------------------------------------+",
"| IntervalMonthDayNano |",
"+-----------------------------------------------------------+",
"| 0 years -1 mons -1 days 0 hours -10 mins 0.000000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -1.000000001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -0.000000001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000000001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000000010 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000000100 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000001000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000010000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000100000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.001000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.010000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.100000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 1.000000000 secs |",
"+-----------------------------------------------------------+",
"+--------------------------+",
"| IntervalMonthDayNano |",
"+--------------------------+",
"| -1 mons -1 days -10 mins |",
"| -1.000000001 secs |",
"| -0.000000001 secs |",
"| 0.000000001 secs |",
"| 0.000000010 secs |",
"| 0.000000100 secs |",
"| 0.000001000 secs |",
"| 0.000010000 secs |",
"| 0.000100000 secs |",
"| 0.001000000 secs |",
"| 0.010000000 secs |",
"| 0.100000000 secs |",
"| 1.000000000 secs |",
"+--------------------------+",
];

let actual: Vec<&str> = table.lines().collect();
Expand Down
11 changes: 6 additions & 5 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
once_cell = { version = "1", optional = true }
paste = { version = "1.0" }
prost = { version = "0.12.3", default-features = false, features = ["prost-derive"] }
prost = { version = "0.13.1", default-features = false, features = ["prost-derive"] }
# For Timestamp type
prost-types = { version = "0.12.3", default-features = false }
prost-types = { version = "0.13.1", default-features = false }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
tonic = { version = "0.11.0", default-features = false, features = ["transport", "codegen", "prost"] }
tonic = { version = "0.12.0", default-features = false, features = ["transport", "codegen", "prost"] }

# CLI-related dependencies
anyhow = { version = "1.0", optional = true }
Expand All @@ -70,8 +70,9 @@ cli = ["anyhow", "arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subsc
[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
assert_cmd = "2.0.8"
http = "0.2.9"
http-body = "0.4.5"
http = "1.1.0"
http-body = "1.0.0"
hyper-util = "0.1"
pin-project-lite = "0.2"
tempfile = "3.3"
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
6 changes: 4 additions & 2 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,8 @@ impl ProstMessageExt for FetchResults {
#[cfg(test)]
mod tests {
use super::*;
use futures::TryStreamExt;
use futures::{TryFutureExt, TryStreamExt};
use hyper_util::rt::TokioIo;
use std::fs;
use std::future::Future;
use std::net::SocketAddr;
Expand Down Expand Up @@ -843,7 +844,8 @@ mod tests {
.serve_with_incoming(stream);

let request_future = async {
let connector = service_fn(move |_| UnixStream::connect(path.clone()));
let connector =
service_fn(move |_| UnixStream::connect(path.clone()).map_ok(TokioIo::new));
let channel = Endpoint::try_from("http://example.com")
.unwrap()
.connect_with_connector(connector)
Expand Down
Loading
Loading