Skip to content

Commit

Permalink
Pass spill write file options down from SpillConfig (#7621)
Browse files Browse the repository at this point in the history
Summary:
Depending on detailed implementations, WriteFile might need additional parameters for configuration. Spill uses WriteFile as its spilling media so we need to allow the spill framework to be able to pass down additional custom options/parameters down.

Pull Request resolved: #7621

Reviewed By: xiaoxmeng, kewang1024

Differential Revision: D51419348

Pulled By: tanjialiang

fbshipit-source-id: 2aca039bd732cc684589fb63e21adcc07bcd517c
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Nov 18, 2023
1 parent f0c1158 commit 460c4f1
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 29 deletions.
6 changes: 4 additions & 2 deletions velox/common/config/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ SpillConfig::SpillConfig(
int32_t _maxSpillLevel,
uint64_t _writerFlushThresholdSize,
int32_t _testSpillPct,
const std::string& _compressionKind)
const std::string& _compressionKind,
const std::unordered_map<std::string, std::string>& _writeFileOptions)
: filePath(_filePath),
maxFileSize(
_maxFileSize == 0 ? std::numeric_limits<int64_t>::max()
Expand All @@ -45,7 +46,8 @@ SpillConfig::SpillConfig(
maxSpillLevel(_maxSpillLevel),
writerFlushThresholdSize(_writerFlushThresholdSize),
testSpillPct(_testSpillPct),
compressionKind(common::stringToCompressionKind(_compressionKind)) {
compressionKind(common::stringToCompressionKind(_compressionKind)),
writeFileOptions(_writeFileOptions) {
VELOX_USER_CHECK_GE(
spillableReservationGrowthPct,
minSpillableReservationPct,
Expand Down
7 changes: 6 additions & 1 deletion velox/common/config/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ struct SpillConfig {
int32_t _maxSpillLevel,
uint64_t _writerFlushThresholdSize,
int32_t _testSpillPct,
const std::string& _compressionKind);
const std::string& _compressionKind,
const std::unordered_map<std::string, std::string>& _writeFileOptions =
{});

/// Returns the hash join spilling level with given 'startBitOffset'.
///
Expand Down Expand Up @@ -103,5 +105,8 @@ struct SpillConfig {

/// CompressionKind when spilling, CompressionKind_NONE means no compression.
common::CompressionKind compressionKind;

/// Custom options passed to velox::FileSystem to create spill WriteFile.
std::unordered_map<std::string, std::string> writeFileOptions;
};
} // namespace facebook::velox::common
20 changes: 14 additions & 6 deletions velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,16 @@ SpillFile::SpillFile(
const std::vector<CompareFlags>& sortCompareFlags,
const std::string& path,
common::CompressionKind compressionKind,
memory::MemoryPool* pool)
memory::MemoryPool* pool,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: id_(id),
type_(std::move(type)),
numSortingKeys_(numSortingKeys),
sortCompareFlags_(sortCompareFlags),
ordinal_(ordinalCounter_++),
path_(fmt::format("{}-{}", path, ordinal_)),
compressionKind_(compressionKind),
writeFileOptions_(filesystems::FileOptions{writeFileOptions, nullptr}),
pool_(pool) {
// NOTE: if the spilling operator has specified the sort comparison flags,
// then it must match the number of sorting keys.
Expand All @@ -120,7 +122,7 @@ SpillFile::SpillFile(
WriteFile& SpillFile::output() {
if (!output_) {
auto fs = filesystems::getFileSystem(path_, nullptr);
output_ = fs->openFileForWrite(path_);
output_ = fs->openFileForWrite(path_, writeFileOptions_);
}
return *output_;
}
Expand Down Expand Up @@ -156,14 +158,16 @@ SpillFileList::SpillFileList(
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Synchronized<SpillStats>* stats)
folly::Synchronized<SpillStats>* stats,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: type_(type),
numSortingKeys_(numSortingKeys),
sortCompareFlags_(sortCompareFlags),
path_(path),
targetFileSize_(targetFileSize),
writeBufferSize_(writeBufferSize),
compressionKind_(compressionKind),
writeFileOptions_(writeFileOptions),
pool_(pool),
stats_(stats) {
// NOTE: if the associated spilling operator has specified the sort
Expand All @@ -186,7 +190,8 @@ WriteFile& SpillFileList::currentOutput() {
sortCompareFlags_,
fmt::format("{}-{}", path_, files_.size()),
compressionKind_,
pool_));
pool_,
writeFileOptions_));
}
return files_.back()->output();
}
Expand Down Expand Up @@ -311,14 +316,16 @@ SpillState::SpillState(
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Synchronized<SpillStats>* stats)
folly::Synchronized<SpillStats>* stats,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: path_(path),
maxPartitions_(maxPartitions),
numSortingKeys_(numSortingKeys),
sortCompareFlags_(sortCompareFlags),
targetFileSize_(targetFileSize),
writeBufferSize_(writeBufferSize),
compressionKind_(compressionKind),
writeFileOptions_(writeFileOptions),
pool_(pool),
stats_(stats),
files_(maxPartitions_) {}
Expand Down Expand Up @@ -358,7 +365,8 @@ uint64_t SpillState::appendToPartition(
writeBufferSize_,
compressionKind_,
pool_,
stats_);
stats_,
writeFileOptions_);
}
updateSpilledInputBytes(rows->estimateFlatSize());

Expand Down
15 changes: 11 additions & 4 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

#include "velox/common/compression/Compression.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/TreeOfLosers.h"
#include "velox/exec/UnorderedStreamReader.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DecodedVector.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::exec {

/// Input stream backed by spill file.
///
/// TODO Usage of ByteInputStream as base class is hacky and just happens to
Expand Down Expand Up @@ -75,7 +75,8 @@ class SpillFile {
const std::vector<CompareFlags>& sortCompareFlags,
const std::string& path,
common::CompressionKind compressionKind,
memory::MemoryPool* pool);
memory::MemoryPool* pool,
const std::unordered_map<std::string, std::string>& writeFileOptions);

uint32_t id() const {
return id_;
Expand Down Expand Up @@ -145,6 +146,7 @@ class SpillFile {
const int32_t ordinal_;
const std::string path_;
const common::CompressionKind compressionKind_;
const filesystems::FileOptions writeFileOptions_;
memory::MemoryPool* const pool_;

// Byte size of the backing file. Set when finishing writing.
Expand Down Expand Up @@ -252,7 +254,8 @@ class SpillFileList {
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Synchronized<SpillStats>* stats);
folly::Synchronized<SpillStats>* stats,
const std::unordered_map<std::string, std::string>& writeFileOptions);

/// Adds 'rows' for the positions in 'indices' into 'this'. The indices
/// must produce a view where the rows are sorted if sorting is desired.
Expand Down Expand Up @@ -303,6 +306,7 @@ class SpillFileList {
const uint64_t targetFileSize_;
const uint64_t writeBufferSize_;
const common::CompressionKind compressionKind_;
const std::unordered_map<std::string, std::string> writeFileOptions_;
memory::MemoryPool* const pool_;
folly::Synchronized<SpillStats>* const stats_;
uint32_t nextFileId_{0};
Expand Down Expand Up @@ -611,7 +615,9 @@ class SpillState {
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Synchronized<SpillStats>* stats);
folly::Synchronized<SpillStats>* stats,
const std::unordered_map<std::string, std::string>& writeFileOptions =
{});

/// Indicates if a given 'partition' has been spilled or not.
bool isPartitionSpilled(int32_t partition) const {
Expand Down Expand Up @@ -701,6 +707,7 @@ class SpillState {
const uint64_t targetFileSize_;
const uint64_t writeBufferSize_;
const common::CompressionKind compressionKind_;
const std::unordered_map<std::string, std::string> writeFileOptions_;
memory::MemoryPool* const pool_;
folly::Synchronized<SpillStats>* const stats_;

Expand Down
30 changes: 20 additions & 10 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ Spiller::Spiller(
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor)
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: Spiller(
type,
container,
Expand All @@ -57,7 +58,8 @@ Spiller::Spiller(
writeBufferSize,
compressionKind,
pool,
executor) {
executor,
writeFileOptions) {
VELOX_CHECK(
type_ == Type::kOrderBy || type_ == Type::kAggregateInput,
"Unexpected spiller type: {}",
Expand All @@ -74,7 +76,8 @@ Spiller::Spiller(
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor)
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: Spiller(
type,
container,
Expand All @@ -87,7 +90,8 @@ Spiller::Spiller(
writeBufferSize,
compressionKind,
pool,
executor) {
executor,
writeFileOptions) {
VELOX_CHECK_EQ(
type,
Type::kAggregateOutput,
Expand All @@ -106,7 +110,8 @@ Spiller::Spiller(
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor)
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: Spiller(
type,
nullptr,
Expand All @@ -119,7 +124,8 @@ Spiller::Spiller(
writeBufferSize,
compressionKind,
pool,
executor) {
executor,
writeFileOptions) {
VELOX_CHECK_EQ(
type_,
Type::kHashJoinProbe,
Expand All @@ -137,7 +143,8 @@ Spiller::Spiller(
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor)
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: Spiller(
type,
container,
Expand All @@ -150,7 +157,8 @@ Spiller::Spiller(
writeBufferSize,
compressionKind,
pool,
executor) {
executor,
writeFileOptions) {
VELOX_CHECK_EQ(
type_,
Type::kHashJoinBuild,
Expand All @@ -170,7 +178,8 @@ Spiller::Spiller(
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor)
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions)
: type_(type),
container_(container),
executor_(executor),
Expand All @@ -186,7 +195,8 @@ Spiller::Spiller(
writeBufferSize,
compressionKind,
pool_,
&stats_) {
&stats_,
writeFileOptions) {
TestValue::adjust(
"facebook::velox::exec::Spiller", const_cast<HashBitRange*>(&bits_));

Expand Down
19 changes: 14 additions & 5 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class Spiller {
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor);
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions =
{});

Spiller(
Type type,
Expand All @@ -65,7 +67,9 @@ class Spiller {
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor);
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions =
{});

Spiller(
Type type,
Expand All @@ -76,7 +80,9 @@ class Spiller {
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor);
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions =
{});

Spiller(
Type type,
Expand All @@ -88,7 +94,9 @@ class Spiller {
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor);
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions =
{});

Type type() const {
return type_;
Expand Down Expand Up @@ -194,7 +202,8 @@ class Spiller {
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
memory::MemoryPool* pool,
folly::Executor* executor);
folly::Executor* executor,
const std::unordered_map<std::string, std::string>& writeFileOptions);

// Invoked to spill. If 'startRowIter' is not null, then we only spill rows
// from row container starting at the offset pointed by 'startRowIter'.
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/HashJoinBridgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class HashJoinBridgeTest : public testing::Test,
std::vector<CompareFlags>({}),
tempDir_->path + "/Spill",
common::CompressionKind_NONE,
pool_.get()));
pool_.get(),
std::unordered_map<std::string, std::string>{}));
// Create a fake file to avoid too many exception logs in test when spill
// file deletion fails.
createFile(files.back()->testingFilePath());
Expand Down

0 comments on commit 460c4f1

Please sign in to comment.