Skip to content

Commit

Permalink
Move parquet-specific options to parquet::WriterOptions (facebookincu…
Browse files Browse the repository at this point in the history
…bator#10470)

Summary:
Pull Request resolved: facebookincubator#10470

Moving parquet-specific options to the new parquet WriterOptions
polymorphic type to remove file format specific configuration code in the
general Hive connector.

Reviewed By: Yuhta

Differential Revision: D59710079

fbshipit-source-id: b95cc7c1a266fd2c278e654f3028d420301fb85b
  • Loading branch information
pedroerp authored and facebook-github-bot committed Jul 22, 2024
1 parent 0c00c93 commit 8156d7d
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 64 deletions.
11 changes: 0 additions & 11 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,6 @@ uint8_t HiveConfig::readTimestampUnit(const Config* session) const {
return unit;
}

uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kParquetWriteTimestampUnitSession,
config_->get<uint8_t>(kParquetWriteTimestampUnit, 9 /*nano*/));
VELOX_CHECK(
unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ ||
unit == 9,
"Invalid timestamp unit.");
return unit;
}

bool HiveConfig::cacheNoRetention(const Config* session) const {
return session->get<bool>(
kCacheNoRetentionSession,
Expand Down
10 changes: 0 additions & 10 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,6 @@ class HiveConfig {
static constexpr const char* kReadTimestampUnitSession =
"hive.reader.timestamp_unit";

/// Timestamp unit for Parquet write through Arrow bridge.
static constexpr const char* kParquetWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
static constexpr const char* kParquetWriteTimestampUnitSession =
"hive.parquet.writer.timestamp_unit";

static constexpr const char* kCacheNoRetention = "cache.no_retention";
static constexpr const char* kCacheNoRetentionSession = "cache.no_retention";

Expand Down Expand Up @@ -342,10 +336,6 @@ class HiveConfig {
// Returns the timestamp unit used when reading timestamps from files.
uint8_t readTimestampUnit(const Config* session) const;

/// Returns the timestamp unit used when writing timestamps into Parquet
/// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano.
uint8_t parquetWriteTimestampUnit(const Config* session) const;

/// Returns true to evict out a query scanned data out of in-memory cache
/// right after the access, and also skip staging to the ssd cache. This helps
/// to prevent the cache space pollution from the one-time table scan by large
Expand Down
26 changes: 15 additions & 11 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,16 +677,25 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());
setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());

// Take the one provided by the user as a starting point, or allocate a new
// one.
const auto& writerOptions = insertTableHandle_->writerOptions();
std::shared_ptr<dwio::common::WriterOptions> options = writerOptions
? writerOptions
: std::make_shared<dwio::common::WriterOptions>();
// Take the writer options provided by the user as a starting point, or
// allocate a new one.
auto options = insertTableHandle_->writerOptions();
if (!options) {
options = writerFactory_->createWriterOptions();
}

const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();

// Acquire file format specifc configs. The precedence order is:
//
// 1. First respect any options specified as part of the query plan (accessed
// through insertTableHandle)
// 2. Otherwise, acquire user defined session properties.
// 3. Lastly, acquire general hive connector configs.
options->processSessionConfigs(*connectorSessionProperties);
options->processHiveConnectorConfigs(*hiveConfig_->config());

// Only overwrite options in case they were not already provided.
if (options->schema == nullptr) {
options->schema = getNonPartitionTypes(dataChannels_, inputType_);
Expand Down Expand Up @@ -738,11 +747,6 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
connectorSessionProperties);
}

if (!options->parquetWriteTimestampUnit) {
options->parquetWriteTimestampUnit =
hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties);
}

if (!options->orcMinCompressionSize) {
options->orcMinCompressionSize = std::optional(
hiveConfig_->orcWriterMinCompressionSize(connectorSessionProperties));
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ target_link_libraries(
velox_caching
velox_common_io
velox_common_compression
velox_config
velox_dwio_common_encryption
velox_dwio_common_exception
velox_exception
Expand Down
9 changes: 8 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "velox/common/compression/Compression.h"
#include "velox/common/io/Options.h"
#include "velox/common/memory/Memory.h"
#include "velox/core/Config.h"
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/common/ErrorTolerance.h"
#include "velox/dwio/common/FlatMapHelper.h"
Expand Down Expand Up @@ -597,13 +598,15 @@ struct WriterOptions {
velox::memory::MemoryPool* memoryPool{nullptr};
const velox::common::SpillConfig* spillConfig{nullptr};
tsan_atomic<bool>* nonReclaimableSection{nullptr};

/// A ready-to-use default memory reclaimer factory. It shall be provided by
/// the system that creates writers to ensure a smooth memory system
/// integration (e.g. graceful suspension upon arbitration request). Writer
/// can choose to implement its custom memory reclaimer if needed and not use
/// this default one.
std::function<std::unique_ptr<velox::memory::MemoryReclaimer>()>
defaultMemoryReclaimerFactory{[]() { return nullptr; }};

std::optional<velox::common::CompressionKind> compressionKind;
std::optional<uint64_t> orcMinCompressionSize{std::nullopt};
std::optional<uint64_t> maxStripeSize{std::nullopt};
Expand All @@ -612,10 +615,14 @@ struct WriterOptions {
std::optional<bool> orcWriterIntegerDictionaryEncodingEnabled{std::nullopt};
std::optional<bool> orcWriterStringDictionaryEncodingEnabled{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> parquetWriteTimestampUnit;
std::optional<uint8_t> zlibCompressionLevel;
std::optional<uint8_t> zstdCompressionLevel;

// WriterOption implementations should provide this function to specify how to
// process format-specific session and connector configs.
virtual void processSessionConfigs(const Config&) {}
virtual void processHiveConnectorConfigs(const Config&) {}

virtual ~WriterOptions() = default;
};

Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/common/WriterFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class WriterFactory {
}
#endif

/// Creates a polymorphic writer options object.
virtual std::unique_ptr<dwio::common::WriterOptions>
createWriterOptions() = 0;

private:
const FileFormat format_;
};
Expand Down
7 changes: 6 additions & 1 deletion velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::dwrf {

namespace {

dwio::common::StripeProgress getStripeProgress(const WriterContext& context) {
return dwio::common::StripeProgress{
.stripeIndex = context.stripeIndex(),
Expand Down Expand Up @@ -860,6 +860,11 @@ std::unique_ptr<dwio::common::Writer> DwrfWriterFactory::createWriter(
return std::make_unique<Writer>(std::move(sink), dwrfOptions);
}

std::unique_ptr<dwio::common::WriterOptions>
DwrfWriterFactory::createWriterOptions() {
return std::make_unique<dwrf::WriterOptions>();
}

void registerDwrfWriterFactory() {
dwio::common::registerWriterFactory(std::make_shared<DwrfWriterFactory>());
}
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

namespace facebook::velox::dwrf {

struct WriterOptions {
struct WriterOptions : public dwio::common::WriterOptions {
std::shared_ptr<const Config> config = std::make_shared<Config>();
std::shared_ptr<const Type> schema;
velox::memory::MemoryPool* memoryPool;
Expand Down Expand Up @@ -220,6 +220,8 @@ class DwrfWriterFactory : public dwio::common::WriterFactory {
std::unique_ptr<dwio::common::Writer> createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const std::shared_ptr<dwio::common::WriterOptions>& options) override;

std::unique_ptr<dwio::common::WriterOptions> createWriterOptions() override;
};

} // namespace facebook::velox::dwrf
2 changes: 1 addition & 1 deletion velox/dwio/parquet/tests/ParquetTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class ParquetTestBase : public testing::Test, public test::VectorTestBase {
facebook::velox::parquet::WriterOptions options;
options.memoryPool = rootPool_.get();
options.flushPolicyFactory = flushPolicy;
options.compression = compressionKind;
options.compressionKind = compressionKind;
return std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), options, rowType);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ TEST_F(E2EFilterTest, compression) {
}

options_.dataPageSize = 4 * 1024;
options_.compression = compression;
options_.compressionKind = compression;

testWithTypes(
"tinyint_val:tinyint,"
Expand Down
5 changes: 2 additions & 3 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ TEST_F(ParquetWriterTest, compression) {
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.compression = CompressionKind::CompressionKind_SNAPPY;
writerOptions.compressionKind = CompressionKind::CompressionKind_SNAPPY;

const auto& fieldNames = schema->names();

Expand Down Expand Up @@ -151,8 +151,7 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
10'000, [](auto row) { return Timestamp(row, row); })});
parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMicro);
writerOptions.parquetWriteTimestampUnit = TimestampUnit::kMicro;

// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
Expand Down
65 changes: 46 additions & 19 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
if (!options.enableDictionary) {
properties = properties->disable_dictionary();
}
properties =
properties->compression(getArrowParquetCompression(options.compression));
properties = properties->compression(getArrowParquetCompression(
options.compressionKind.value_or(common::CompressionKind_NONE)));
for (const auto& columnCompressionValues : options.columnCompressionsMap) {
properties->compression(
columnCompressionValues.first,
Expand Down Expand Up @@ -236,7 +236,7 @@ Writer::Writer(
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
}
options_.timestampUnit =
static_cast<TimestampUnit>(options.parquetWriteTimestampUnit);
options.parquetWriteTimestampUnit.value_or(TimestampUnit::kNano);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -388,20 +388,6 @@ void Writer::abort() {
arrowContext_.reset();
}

parquet::WriterOptions getParquetOptions(
const dwio::common::WriterOptions& options) {
parquet::WriterOptions parquetOptions;
parquetOptions.memoryPool = options.memoryPool;
if (options.compressionKind.has_value()) {
parquetOptions.compression = options.compressionKind.value();
}
if (options.parquetWriteTimestampUnit.has_value()) {
parquetOptions.parquetWriteTimestampUnit =
options.parquetWriteTimestampUnit.value();
}
return parquetOptions;
}

void Writer::setMemoryReclaimers() {
VELOX_CHECK(
!pool_->isLeaf(),
Expand All @@ -419,12 +405,53 @@ void Writer::setMemoryReclaimers() {
generalPool_->setReclaimer(exec::MemoryReclaimer::create());
}

namespace {

std::optional<TimestampUnit> getTimestampUnit(
const Config& config,
const char* configKey) {
if (const auto unit = config.get<uint8_t>(configKey)) {
VELOX_CHECK(
unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ ||
unit == 9 /*nano*/,
"Invalid timestamp unit: {}",
unit.value());
return std::optional(static_cast<TimestampUnit>(unit.value()));
}
return std::nullopt;
}

} // namespace

void WriterOptions::processSessionConfigs(const Config& config) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetSessionWriteTimestampUnit);
}
}

void WriterOptions::processHiveConnectorConfigs(const Config& config) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit);
}
}

std::unique_ptr<dwio::common::Writer> ParquetWriterFactory::createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const std::shared_ptr<dwio::common::WriterOptions>& options) {
auto parquetOptions = getParquetOptions(*options);
auto parquetOptions =
std::dynamic_pointer_cast<parquet::WriterOptions>(options);
VELOX_CHECK_NOT_NULL(
parquetOptions,
"Parquet writer factory expected a Parquet WriterOptions object.");
return std::make_unique<Writer>(
std::move(sink), parquetOptions, asRowType(options->schema));
std::move(sink), *parquetOptions, asRowType(options->schema));
}

std::unique_ptr<dwio::common::WriterOptions>
ParquetWriterFactory::createWriterOptions() {
return std::make_unique<parquet::WriterOptions>();
}

} // namespace facebook::velox::parquet
29 changes: 24 additions & 5 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "velox/common/compression/Compression.h"
#include "velox/core/Config.h"
#include "velox/dwio/common/DataBuffer.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/FlushPolicy.h"
Expand Down Expand Up @@ -86,26 +87,42 @@ class LambdaFlushPolicy : public DefaultFlushPolicy {
std::function<bool()> lambda_;
};

struct WriterOptions {
struct WriterOptions : public dwio::common::WriterOptions {
bool enableDictionary = true;
int64_t dataPageSize = 1'024 * 1'024;
int64_t dictionaryPageSizeLimit = 1'024 * 1'024;

// Growth ratio passed to ArrowDataBufferSink. The default value is a
// heuristic borrowed from
// folly/FBVector(https://github.com/facebook/folly/blob/main/folly/docs/FBVector.md#memory-handling).
double bufferGrowRatio = 1.5;
common::CompressionKind compression = common::CompressionKind_NONE;

arrow::Encoding::type encoding = arrow::Encoding::PLAIN;
velox::memory::MemoryPool* memoryPool;

// The default factory allows the writer to construct the default flush
// policy with the configs in its ctor.
std::function<std::unique_ptr<DefaultFlushPolicy>()> flushPolicyFactory;
std::shared_ptr<CodecOptions> codecOptions;
std::unordered_map<std::string, common::CompressionKind>
columnCompressionsMap;
uint8_t parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kNano);

/// Timestamp unit for Parquet write through Arrow bridge.
/// Default if not specified: TimestampUnit::kNano (9).
std::optional<TimestampUnit> parquetWriteTimestampUnit;
bool writeInt96AsTimestamp = false;

// Parsing session and hive configs.

// This isn't a typo; session and hive connector config names are different
// ('_' vs '-').
static constexpr const char* kParquetSessionWriteTimestampUnit =
"hive.parquet.writer.timestamp_unit";
static constexpr const char* kParquetHiveConnectorWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";

// Process hive connector and session configs.
void processSessionConfigs(const Config& config) override;
void processHiveConnectorConfigs(const Config& config) override;
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down Expand Up @@ -176,6 +193,8 @@ class ParquetWriterFactory : public dwio::common::WriterFactory {
std::unique_ptr<dwio::common::Writer> createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
const std::shared_ptr<dwio::common::WriterOptions>& options) override;

std::unique_ptr<dwio::common::WriterOptions> createWriterOptions() override;
};

} // namespace facebook::velox::parquet

0 comments on commit 8156d7d

Please sign in to comment.