Skip to content

Commit

Permalink
Support smallest_decimal_enabled flag in pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
curioustien committed Feb 17, 2025
1 parent d81cf13 commit 424472f
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 19 deletions.
4 changes: 3 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
parquet_scan_options.arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options.arrow_reader_properties->io_context());
arrow_properties.set_smallest_decimal_enabled(
parquet_scan_options.arrow_reader_properties->smallest_decimal_enabled());
arrow_properties.set_use_threads(options.use_threads);
return arrow_properties;
}
Expand Down Expand Up @@ -532,7 +534,7 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
metadata)
.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
auto arrow_properties = MakeArrowReaderProperties(
*self, *reader->metadata(), *options, *parquet_scan_options);

Expand Down
1 change: 0 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include "parquet/properties.h"
#ifdef _MSC_VER
# pragma warning(push)
// Disable forcing value to bool warnings
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ class FileWriterImpl : public FileWriter {
}
}

Status Init(const ArrowReaderProperties& schema_arrow_reader_properities) {
Status Init(const ArrowReaderProperties& schema_arrow_reader_properties) {
return SchemaManifest::Make(writer_->schema(), /*schema_metadata=*/nullptr,
schema_arrow_reader_properities, &schema_manifest_);
schema_arrow_reader_properties, &schema_manifest_);
}

Status NewRowGroup() override {
Expand Down Expand Up @@ -516,10 +516,10 @@ Status FileWriter::Make(::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::Schema> schema,
std::shared_ptr<ArrowWriterProperties> arrow_properties,
std::unique_ptr<FileWriter>* out,
const ArrowReaderProperties& schema_arrow_reader_properities) {
const ArrowReaderProperties& schema_arrow_reader_properties) {
std::unique_ptr<FileWriterImpl> impl(new FileWriterImpl(
std::move(schema), pool, std::move(writer), std::move(arrow_properties)));
RETURN_NOT_OK(impl->Init(schema_arrow_reader_properities));
RETURN_NOT_OK(impl->Init(schema_arrow_reader_properties));
*out = std::move(impl);
return Status::OK();
}
Expand Down Expand Up @@ -556,7 +556,7 @@ Result<std::unique_ptr<FileWriter>> FileWriter::Open(
std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<WriterProperties> properties,
std::shared_ptr<ArrowWriterProperties> arrow_properties,
const ArrowReaderProperties& schema_arrow_reader_properities) {
const ArrowReaderProperties& schema_arrow_reader_properties) {
std::shared_ptr<SchemaDescriptor> parquet_schema;
RETURN_NOT_OK(
ToParquetSchema(&schema, *properties, *arrow_properties, &parquet_schema));
Expand All @@ -575,7 +575,7 @@ Result<std::unique_ptr<FileWriter>> FileWriter::Open(
auto schema_ptr = std::make_shared<::arrow::Schema>(schema);
RETURN_NOT_OK(Make(pool, std::move(base_writer), std::move(schema_ptr),
std::move(arrow_properties), &writer,
schema_arrow_reader_properities));
schema_arrow_reader_properties));

return writer;
}
Expand All @@ -596,12 +596,12 @@ Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size,
std::shared_ptr<WriterProperties> properties,
std::shared_ptr<ArrowWriterProperties> arrow_properties,
const ArrowReaderProperties& schema_arrow_reader_properities) {
const ArrowReaderProperties& schema_arrow_reader_properties) {
std::unique_ptr<FileWriter> writer;
ARROW_ASSIGN_OR_RAISE(
writer,
FileWriter::Open(*table.schema(), pool, std::move(sink), std::move(properties),
std::move(arrow_properties), schema_arrow_reader_properities));
std::move(arrow_properties), schema_arrow_reader_properties));
RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
return writer->Close();
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class PARQUET_EXPORT FileWriter {
std::shared_ptr<::arrow::Schema> schema,
std::shared_ptr<ArrowWriterProperties> arrow_properties,
std::unique_ptr<FileWriter>* out,
const ArrowReaderProperties& schema_arrow_reader_properities =
const ArrowReaderProperties& schema_arrow_reader_properties =
default_arrow_reader_properties());

/// \brief Try to create an Arrow to Parquet file writer.
Expand All @@ -76,7 +76,7 @@ class PARQUET_EXPORT FileWriter {
std::shared_ptr<WriterProperties> properties = default_writer_properties(),
std::shared_ptr<ArrowWriterProperties> arrow_properties =
default_arrow_writer_properties(),
const ArrowReaderProperties& schema_arrow_reader_properities =
const ArrowReaderProperties& schema_arrow_reader_properties =
default_arrow_reader_properties());

/// Return the Arrow schema to be written to.
Expand Down Expand Up @@ -183,7 +183,7 @@ WriteTable(const ::arrow::Table& table, MemoryPool* pool,
std::shared_ptr<WriterProperties> properties = default_writer_properties(),
std::shared_ptr<ArrowWriterProperties> arrow_properties =
default_arrow_writer_properties(),
const ArrowReaderProperties& schema_arrow_reader_properities =
const ArrowReaderProperties& schema_arrow_reader_properties =
default_arrow_reader_properties());

} // namespace arrow
Expand Down
19 changes: 17 additions & 2 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
cache_options : pyarrow.CacheOptions, default None
Cache options used when pre_buffer is enabled. The default values should
be good for most use cases. You may want to adjust these for example if
you have exceptionally high latency to the file system.
you have exceptionally high latency to the file system.
thrift_string_size_limit : int, default None
If not None, override the maximum total string size allocated
when decoding Thrift structures. The default limit should be
Expand All @@ -720,6 +720,11 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
Parquet file.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.
smallest_decimal_enabled : bool, default False
If True, always convert to the smallest arrow decimal type based
on precision.
"""

# Avoid mistakingly creating attributes
Expand All @@ -733,7 +738,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
thrift_container_size_limit=None,
decryption_config=None,
decryption_properties=None,
bint page_checksum_verification=False):
bint page_checksum_verification=False,
bint smallest_decimal_enabled=False):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
Expand All @@ -752,6 +758,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
if decryption_properties is not None:
self.decryption_properties = decryption_properties
self.page_checksum_verification = page_checksum_verification
self.smallest_decimal_enabled = smallest_decimal_enabled

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
Expand Down Expand Up @@ -868,6 +875,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def page_checksum_verification(self, bint page_checksum_verification):
self.reader_properties().set_page_checksum_verification(page_checksum_verification)

@property
def smallest_decimal_enabled(self):
return self.arrow_reader_properties().smallest_decimal_enabled()

@smallest_decimal_enabled.setter
def smallest_decimal_enabled(self, bint smallest_decimal_enabled):
self.arrow_reader_properties().set_smallest_decimal_enabled(smallest_decimal_enabled)

def equals(self, ParquetFragmentScanOptions other):
"""
Parameters
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
CCacheOptions cache_options() const
void set_coerce_int96_timestamp_unit(TimeUnit unit)
TimeUnit coerce_int96_timestamp_unit() const
void set_smallest_decimal_enabled(c_bool smallest_decimal_enabled)
c_bool smallest_decimal_enabled() const

ArrowReaderProperties default_arrow_reader_properties()

Expand Down
5 changes: 4 additions & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,8 @@ cdef class ParquetReader(_Weakrefable):
FileDecryptionProperties decryption_properties=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None,
page_checksum_verification=False):
page_checksum_verification=False,
smallest_decimal_enabled=False):
"""
Open a parquet file for reading.
Expand All @@ -1458,6 +1459,7 @@ cdef class ParquetReader(_Weakrefable):
thrift_string_size_limit : int, optional
thrift_container_size_limit : int, optional
page_checksum_verification : bool, default False
smallest_decimal_enabled : bool, default False
"""
cdef:
shared_ptr[CFileMetaData] c_metadata
Expand Down Expand Up @@ -1497,6 +1499,7 @@ cdef class ParquetReader(_Weakrefable):
decryption_properties.unwrap())

arrow_props.set_pre_buffer(pre_buffer)
arrow_props.set_smallest_decimal_enabled(smallest_decimal_enabled)

properties.set_page_checksum_verification(page_checksum_verification)

Expand Down
21 changes: 18 additions & 3 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ class ParquetFile:
it will be parsed as an URI to determine the filesystem.
page_checksum_verification : bool, default False
If True, verify the checksum for each page read from the file.
smallest_decimal_enabled : bool, default False
If True, always convert to the smallest arrow decimal type based
on precision.
Examples
--------
Expand Down Expand Up @@ -303,7 +306,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
pre_buffer=False, coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None, filesystem=None,
page_checksum_verification=False):
page_checksum_verification=False, smallest_decimal_enabled=False):

self._close_source = getattr(source, 'closed', True)

Expand All @@ -323,6 +326,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
smallest_decimal_enabled=smallest_decimal_enabled,
)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
Expand Down Expand Up @@ -1267,6 +1271,9 @@ class ParquetDataset:
If True, verify the page checksum for each page read from the file.
use_legacy_dataset : bool, optional
Deprecated and has no effect from PyArrow version 15.0.0.
smallest_decimal_enabled : bool, default False
If True, always convert to the smallest arrow decimal type based
on precision.
Examples
--------
Expand All @@ -1280,7 +1287,8 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None,
page_checksum_verification=False,
use_legacy_dataset=None):
use_legacy_dataset=None,
smallest_decimal_enabled=False):

if use_legacy_dataset is not None:
warnings.warn(
Expand All @@ -1297,6 +1305,7 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, *, filters=None,
"thrift_string_size_limit": thrift_string_size_limit,
"thrift_container_size_limit": thrift_container_size_limit,
"page_checksum_verification": page_checksum_verification,
"smallest_decimal_enabled": smallest_decimal_enabled,
}
if buffer_size:
read_options.update(use_buffered_stream=True,
Expand Down Expand Up @@ -1686,6 +1695,9 @@ def partitioning(self):
sufficient for most Parquet files.
page_checksum_verification : bool, default False
If True, verify the checksum for each page read from the file.
smallest_decimal_enabled : bool, default False
If True, always convert to the smallest arrow decimal type based
on precision.
Returns
-------
Expand Down Expand Up @@ -1781,7 +1793,8 @@ def read_table(source, *, columns=None, use_threads=True,
coerce_int96_timestamp_unit=None,
decryption_properties=None, thrift_string_size_limit=None,
thrift_container_size_limit=None,
page_checksum_verification=False):
page_checksum_verification=False,
smallest_decimal_enabled=False):

if use_legacy_dataset is not None:
warnings.warn(
Expand All @@ -1806,6 +1819,7 @@ def read_table(source, *, columns=None, use_threads=True,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
smallest_decimal_enabled=smallest_decimal_enabled,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
Expand Down Expand Up @@ -1838,6 +1852,7 @@ def read_table(source, *, columns=None, use_threads=True,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
page_checksum_verification=page_checksum_verification,
smallest_decimal_enabled=smallest_decimal_enabled,
)

return dataset.read(columns=columns, use_threads=use_threads,
Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/tests/parquet/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,20 @@ def test_byte_stream_split():
use_dictionary=False)


def test_smallest_decimal_enabled(tempdir):
arr1 = pa.array(list(map(Decimal, range(100))), type=pa.decimal32(5, 2))
arr2 = pa.array(list(map(Decimal, range(100))), type=pa.decimal64(16, 9))
arr3 = pa.array(list(map(Decimal, range(100))), type=pa.decimal128(22, 2))
arr4 = pa.array(list(map(Decimal, range(100))), type=pa.decimal256(48, 2))
data_decimal = [arr1, arr2, arr3, arr4]
table = pa.Table.from_arrays(data_decimal, names=['a', 'b', 'c', 'd'])

# Check with smallest_decimal_enabled
_check_roundtrip(table,
expected=table,
read_table_kwargs={"smallest_decimal_enabled": True})


def test_store_decimal_as_integer(tempdir):
arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))),
type=pa.decimal128(5, 2))
Expand Down

0 comments on commit 424472f

Please sign in to comment.