diff --git a/velox/common/config/SpillConfig.cpp b/velox/common/config/SpillConfig.cpp index 46ed0580b7b4..e3f08a3ccc31 100644 --- a/velox/common/config/SpillConfig.cpp +++ b/velox/common/config/SpillConfig.cpp @@ -30,7 +30,8 @@ SpillConfig::SpillConfig( int32_t _maxSpillLevel, uint64_t _writerFlushThresholdSize, int32_t _testSpillPct, - const std::string& _compressionKind) + const std::string& _compressionKind, + const std::unordered_map& _writeFileOptions) : filePath(_filePath), maxFileSize( _maxFileSize == 0 ? std::numeric_limits::max() @@ -45,7 +46,8 @@ SpillConfig::SpillConfig( maxSpillLevel(_maxSpillLevel), writerFlushThresholdSize(_writerFlushThresholdSize), testSpillPct(_testSpillPct), - compressionKind(common::stringToCompressionKind(_compressionKind)) { + compressionKind(common::stringToCompressionKind(_compressionKind)), + writeFileOptions(_writeFileOptions) { VELOX_USER_CHECK_GE( spillableReservationGrowthPct, minSpillableReservationPct, diff --git a/velox/common/config/SpillConfig.h b/velox/common/config/SpillConfig.h index a42eba2b39ca..ef0708f248d0 100644 --- a/velox/common/config/SpillConfig.h +++ b/velox/common/config/SpillConfig.h @@ -36,7 +36,9 @@ struct SpillConfig { int32_t _maxSpillLevel, uint64_t _writerFlushThresholdSize, int32_t _testSpillPct, - const std::string& _compressionKind); + const std::string& _compressionKind, + const std::unordered_map& _writeFileOptions = + {}); /// Returns the hash join spilling level with given 'startBitOffset'. /// @@ -103,5 +105,8 @@ struct SpillConfig { /// CompressionKind when spilling, CompressionKind_NONE means no compression. common::CompressionKind compressionKind; + + /// Custom options passed to velox::FileSystem to create spill WriteFile. + std::unordered_map writeFileOptions; }; } // namespace facebook::velox::common diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index 2e2379c55bb7..97eebaf63063 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -102,7 +102,8 @@ SpillFile::SpillFile( const std::vector& sortCompareFlags, const std::string& path, common::CompressionKind compressionKind, - memory::MemoryPool* pool) + memory::MemoryPool* pool, + const std::unordered_map& writeFileOptions) : id_(id), type_(std::move(type)), numSortingKeys_(numSortingKeys), @@ -110,6 +111,7 @@ SpillFile::SpillFile( ordinal_(ordinalCounter_++), path_(fmt::format("{}-{}", path, ordinal_)), compressionKind_(compressionKind), + writeFileOptions_(filesystems::FileOptions{writeFileOptions, nullptr}), pool_(pool) { // NOTE: if the spilling operator has specified the sort comparison flags, // then it must match the number of sorting keys. @@ -120,7 +122,7 @@ SpillFile::SpillFile( WriteFile& SpillFile::output() { if (!output_) { auto fs = filesystems::getFileSystem(path_, nullptr); - output_ = fs->openFileForWrite(path_); + output_ = fs->openFileForWrite(path_, writeFileOptions_); } return *output_; } @@ -156,7 +158,8 @@ SpillFileList::SpillFileList( uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Synchronized* stats) + folly::Synchronized* stats, + const std::unordered_map& writeFileOptions) : type_(type), numSortingKeys_(numSortingKeys), sortCompareFlags_(sortCompareFlags), @@ -164,6 +167,7 @@ SpillFileList::SpillFileList( targetFileSize_(targetFileSize), writeBufferSize_(writeBufferSize), compressionKind_(compressionKind), + writeFileOptions_(writeFileOptions), pool_(pool), stats_(stats) { // NOTE: if the associated spilling operator has specified the sort @@ -186,7 +190,8 @@ WriteFile& SpillFileList::currentOutput() { sortCompareFlags_, fmt::format("{}-{}", path_, files_.size()), compressionKind_, - pool_)); + pool_, + writeFileOptions_)); } return files_.back()->output(); } @@ -311,7 +316,8 @@ SpillState::SpillState( uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Synchronized* stats) + folly::Synchronized* stats, + const std::unordered_map& writeFileOptions) : path_(path), maxPartitions_(maxPartitions), numSortingKeys_(numSortingKeys), @@ -319,6 +325,7 @@ SpillState::SpillState( targetFileSize_(targetFileSize), writeBufferSize_(writeBufferSize), compressionKind_(compressionKind), + writeFileOptions_(writeFileOptions), pool_(pool), stats_(stats), files_(maxPartitions_) {} @@ -358,7 +365,8 @@ uint64_t SpillState::appendToPartition( writeBufferSize_, compressionKind_, pool_, - stats_); + stats_, + writeFileOptions_); } updateSpilledInputBytes(rows->estimateFlatSize()); diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index 76dfb0bc6f94..de48c47d0206 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -20,6 +20,7 @@ #include "velox/common/compression/Compression.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" #include "velox/exec/TreeOfLosers.h" #include "velox/exec/UnorderedStreamReader.h" #include "velox/vector/ComplexVector.h" @@ -27,7 +28,6 @@ #include "velox/vector/VectorStream.h" namespace facebook::velox::exec { - /// Input stream backed by spill file. /// /// TODO Usage of ByteInputStream as base class is hacky and just happens to @@ -75,7 +75,8 @@ class SpillFile { const std::vector& sortCompareFlags, const std::string& path, common::CompressionKind compressionKind, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + const std::unordered_map& writeFileOptions); uint32_t id() const { return id_; @@ -145,6 +146,7 @@ class SpillFile { const int32_t ordinal_; const std::string path_; const common::CompressionKind compressionKind_; + const filesystems::FileOptions writeFileOptions_; memory::MemoryPool* const pool_; // Byte size of the backing file. Set when finishing writing. @@ -252,7 +254,8 @@ class SpillFileList { uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Synchronized* stats); + folly::Synchronized* stats, + const std::unordered_map& writeFileOptions); /// Adds 'rows' for the positions in 'indices' into 'this'. The indices /// must produce a view where the rows are sorted if sorting is desired. @@ -303,6 +306,7 @@ class SpillFileList { const uint64_t targetFileSize_; const uint64_t writeBufferSize_; const common::CompressionKind compressionKind_; + const std::unordered_map writeFileOptions_; memory::MemoryPool* const pool_; folly::Synchronized* const stats_; uint32_t nextFileId_{0}; @@ -611,7 +615,9 @@ class SpillState { uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Synchronized* stats); + folly::Synchronized* stats, + const std::unordered_map& writeFileOptions = + {}); /// Indicates if a given 'partition' has been spilled or not. bool isPartitionSpilled(int32_t partition) const { @@ -701,6 +707,7 @@ class SpillState { const uint64_t targetFileSize_; const uint64_t writeBufferSize_; const common::CompressionKind compressionKind_; + const std::unordered_map writeFileOptions_; memory::MemoryPool* const pool_; folly::Synchronized* const stats_; diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 40a759d0446d..2528f153d38c 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -44,7 +44,8 @@ Spiller::Spiller( uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor) + folly::Executor* executor, + const std::unordered_map& writeFileOptions) : Spiller( type, container, @@ -57,7 +58,8 @@ Spiller::Spiller( writeBufferSize, compressionKind, pool, - executor) { + executor, + writeFileOptions) { VELOX_CHECK( type_ == Type::kOrderBy || type_ == Type::kAggregateInput, "Unexpected spiller type: {}", @@ -74,7 +76,8 @@ Spiller::Spiller( uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor) + folly::Executor* executor, + const std::unordered_map& writeFileOptions) : Spiller( type, container, @@ -87,7 +90,8 @@ Spiller::Spiller( writeBufferSize, compressionKind, pool, - executor) { + executor, + writeFileOptions) { VELOX_CHECK_EQ( type, Type::kAggregateOutput, @@ -106,7 +110,8 @@ Spiller::Spiller( uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor) + folly::Executor* executor, + const std::unordered_map& writeFileOptions) : Spiller( type, nullptr, @@ -119,7 +124,8 @@ Spiller::Spiller( writeBufferSize, compressionKind, pool, - executor) { + executor, + writeFileOptions) { VELOX_CHECK_EQ( type_, Type::kHashJoinProbe, @@ -137,7 +143,8 @@ Spiller::Spiller( uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor) + folly::Executor* executor, + const std::unordered_map& writeFileOptions) : Spiller( type, container, @@ -150,7 +157,8 @@ Spiller::Spiller( writeBufferSize, compressionKind, pool, - executor) { + executor, + writeFileOptions) { VELOX_CHECK_EQ( type_, Type::kHashJoinBuild, @@ -170,7 +178,8 @@ Spiller::Spiller( uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor) + folly::Executor* executor, + const std::unordered_map& writeFileOptions) : type_(type), container_(container), executor_(executor), @@ -186,7 +195,8 @@ Spiller::Spiller( writeBufferSize, compressionKind, pool_, - &stats_) { + &stats_, + writeFileOptions) { TestValue::adjust( "facebook::velox::exec::Spiller", const_cast(&bits_)); diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 7c1793235567..a5d9dce7d114 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -55,7 +55,9 @@ class Spiller { uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor); + folly::Executor* executor, + const std::unordered_map& writeFileOptions = + {}); Spiller( Type type, @@ -65,7 +67,9 @@ class Spiller { uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor); + folly::Executor* executor, + const std::unordered_map& writeFileOptions = + {}); Spiller( Type type, @@ -76,7 +80,9 @@ class Spiller { uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor); + folly::Executor* executor, + const std::unordered_map& writeFileOptions = + {}); Spiller( Type type, @@ -88,7 +94,9 @@ class Spiller { uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor); + folly::Executor* executor, + const std::unordered_map& writeFileOptions = + {}); Type type() const { return type_; @@ -194,7 +202,8 @@ class Spiller { uint64_t writeBufferSize, common::CompressionKind compressionKind, memory::MemoryPool* pool, - folly::Executor* executor); + folly::Executor* executor, + const std::unordered_map& writeFileOptions); // Invoked to spill. If 'startRowIter' is not null, then we only spill rows // from row container starting at the offset pointed by 'startRowIter'. diff --git a/velox/exec/tests/HashJoinBridgeTest.cpp b/velox/exec/tests/HashJoinBridgeTest.cpp index eeca81c57388..bd16f3d9d4a3 100644 --- a/velox/exec/tests/HashJoinBridgeTest.cpp +++ b/velox/exec/tests/HashJoinBridgeTest.cpp @@ -105,7 +105,8 @@ class HashJoinBridgeTest : public testing::Test, std::vector({}), tempDir_->path + "/Spill", common::CompressionKind_NONE, - pool_.get())); + pool_.get(), + std::unordered_map{})); // Create a fake file to avoid too many exception logs in test when spill // file deletion fails. createFile(files.back()->testingFilePath());