From 9bcf552334f46132602bc814acbbde44185694aa Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 17 Jul 2024 17:40:01 -0400 Subject: [PATCH] Example of reading and writing parquet metadata outside the file --- parquet/Cargo.toml | 6 + parquet/examples/external_metadata.rs | 155 ++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 parquet/examples/external_metadata.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index b97b2a571646..3541888b9475 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -118,6 +118,12 @@ zstd = ["dep:zstd", "zstd-sys"] # Display memory in example/write_parquet.rs sysinfo = ["dep:sysinfo"] + +[[example]] +name = "external_metadata" +required-features = ["arrow", "async"] +path = "./examples/external_metadata.rs" + [[example]] name = "read_parquet" required-features = ["arrow"] diff --git a/parquet/examples/external_metadata.rs b/parquet/examples/external_metadata.rs new file mode 100644 index 000000000000..64b4c465a368 --- /dev/null +++ b/parquet/examples/external_metadata.rs @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; +use arrow_cast::pretty::pretty_format_batches; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; +use std::path::Path; + +/// This example demonstrates advanced usage of the Parquet metadata APIs. +/// +/// It shows how you can store Parquet metadata somewhere other than the Parquet +/// file itself, and then use that metadata to read a file. This can be used, +/// for example, to store metadata for parquet files on remote object storage +/// (e.g. S3) in a local file or an in-memory cache, use a query engine like +/// DataFusion to analyze the metadata to determine which files may contain +/// relevant data, and then read only those the required files with a single +/// object store request. +/// +/// The example: +/// 1. Reads the metadata of a Parquet file +/// 2. Removes some column statistics from the metadata (to make them smaller) +/// 3. Stores the metadata in a separate file +/// 4. Reads the metadata from the separate file and uses that to read the Parquet file +/// +/// Without this API, to implement the functionality you would need to implement +/// a conversion of the `ParquetMetaData` and related structures to/from some +/// other structs that can be serialized/deserialized. + +#[tokio::main(flavor = "current_thread")] +async fn main() -> parquet::errors::Result<()> { + let testdata = arrow::util::test_util::parquet_test_data(); + let parquet_path = format!("{testdata}/alltypes_plain.parquet"); + let metadata_path = "thrift_metadata.dat"; // todo tempdir for now use local file to inspect it + + let metadata = get_metadata_from_parquet_file(&parquet_path).await; + println!( + "Read metadata from Parquet file into memory: {} bytes", + metadata.memory_size() + ); + let metadata = prepare_metadata(metadata); + write_metadata_to_file(metadata, &metadata_path); + + // now read the metadata from the file and use it to read the Parquet file + let metadata = read_metadata_from_file(&metadata_path); + println!("Read metadata from file: {metadata:#?}"); + + let batches = read_parquet_file_with_metadata(&parquet_path, metadata); + + // display the results + let batches_string = pretty_format_batches(&batches).unwrap().to_string(); + let batches_lines: Vec<_> = batches_string.split('\n').collect(); + + assert_eq!(batches_lines, + [ + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |", + "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |", + "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |", + "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |", + "| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |", + "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |", + "| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |", + "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |", + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + ] + + , "actual output:\n\n{batches_lines:#?}"); + + Ok(()) +} + +/// Reads the metadata from a parquet file +async fn get_metadata_from_parquet_file(file: impl AsRef) -> ParquetMetaData { + // pretend we are reading the metadata from a remote object store + let file = std::fs::File::open(file).unwrap(); + let file = tokio::fs::File::from_std(file); + + let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap(); + + // The metadata is Arc'd -- since we are going to modify it we + // need to clone it + builder.metadata().as_ref().clone() +} + +/// modifies the metadata to reduce its size +fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData { + // maybe we will do this + metadata +} + +/// writes the metadata to a file +/// +/// The data is stored using the same thrift format as the Parquet file metadata +fn write_metadata_to_file(metadata: ParquetMetaData, file: impl AsRef) { + let file = std::fs::File::create(file).unwrap(); + let writer = ParquetMetaDataWriter::new(file, &metadata); + writer.finish().unwrap() +} + +/// Reads the metadata from a file +/// +/// This function reads the format written by `write_metadata_to_file` +fn read_metadata_from_file(file: impl AsRef) -> ParquetMetaData { + let mut file = std::fs::File::open(file).unwrap(); + ParquetMetaDataReader::new() + .with_column_indexes(true) + .with_offset_indexes(true) + .parse_and_finish(&mut file) + .unwrap() +} + +/// Reads the Parquet file using the metadata +/// +/// This shows how to read the Parquet file using previously read metadata +/// instead of the metadata in the Parquet file itself. This avoids an IO / +/// having to fetch and decode the metadata from the Parquet file before +/// beginning to read it. +/// +/// In this example, we read the results as Arrow record batches +fn read_parquet_file_with_metadata( + file: impl AsRef, + metadata: ParquetMetaData, +) -> Vec { + let file = std::fs::File::open(file).unwrap(); + let options = ArrowReaderOptions::new() + // tell the reader to read the page index + .with_page_index(true); + // create a reader with pre-existing metadata + let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap(); + let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, arrow_reader_metadata) + .build() + .unwrap(); + + reader.collect::>>().unwrap() +}