From 8f3cd138292bedcd28a27ac6583198d956e60731 Mon Sep 17 00:00:00 2001 From: Xiao Du Date: Fri, 13 Dec 2024 13:11:49 -0800 Subject: [PATCH] Add session properties for scale writer query configs --- .../sphinx/presto_cpp/properties-session.rst | 44 ++++++++++++++++++- .../NativeWorkerSessionPropertyProvider.java | 35 ++++++++++++++- .../presto_cpp/main/SessionProperties.cpp | 41 +++++++++++++++++ .../presto_cpp/main/SessionProperties.h | 27 +++++++++++- .../main/tests/SessionPropertiesTest.cpp | 14 +++++- 5 files changed, 156 insertions(+), 5 deletions(-) diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst index 0fe43e1c08c25..b699b922003a6 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties-session.rst @@ -376,4 +376,46 @@ The shard id to be traced. If not specified, all shards will be matched. * **Default value:** ``""`` The regular expression to match a task for tracing. It will be deprecated if there is -no issue with native_query_trace_fragment_id and native_query_trace_shard_id. \ No newline at end of file +no issue with native_query_trace_fragment_id and native_query_trace_shard_id. + +``native_scaled_writer_rebalance_max_memory_usage_ratio`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``double`` +* **Minimum value:** ``0`` +* **Maximum value:** ``1`` +* **Default value:** ``0.7`` + +The max ratio of a query used memory to its max capacity, and the scale +writer exchange stops scaling writer processing if the query's current +memory usage exceeds this ratio. The value is in the range of (0, 1]. + +``native_scaled_writer_max_partitions_per_writer`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``integer`` +* **Default value:** ``128`` + +The max number of logical table partitions that can be assigned to a +single table writer thread. The logical table partition is used by local +exchange writer for writer scaling, and multiple physical table +partitions can be mapped to the same logical table partition based on the +hash value of calculated partitioned ids. + +``native_scaled_writer_min_partition_processed_bytes_rebalance_threshold`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``bigint`` +* **Default value:** ``134217728`` + +Minimum amount of data processed by a logical table partition to trigger +writer scaling if it is detected as overloaded by scale writer exchange. + +``native_scaled_writer_min_processed_bytes_rebalance_threshold`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``bigint`` +* **Default value:** ``268435456`` + +Minimum amount of data processed by all the logical table partitions to +trigger skewed partition rebalancing by scale writer exchange. diff --git a/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java b/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java index d2411b5a3cb78..08d6c939b6d84 100644 --- a/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/sessionpropertyproviders/NativeWorkerSessionPropertyProvider.java @@ -22,6 +22,7 @@ import java.util.List; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; +import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; import static com.facebook.presto.spi.session.PropertyMetadata.longProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; @@ -68,6 +69,10 @@ public class NativeWorkerSessionPropertyProvider public static final String NATIVE_PREFIXSORT_NORMALIZED_KEY_MAX_BYTES = "native_prefixsort_normalized_key_max_bytes"; public static final String NATIVE_PREFIXSORT_MIN_ROWS = "native_prefixsort_min_rows"; public static final String NATIVE_OP_TRACE_DIR_CREATE_CONFIG = "native_op_trace_directory_create_config"; + public static final String NATIVE_SCALED_WRITER_REBALANCE_MAX_MEMORY_USAGE_RATIO = "native_scaled_writer_rebalance_max_memory_usage_ratio"; + public static final String NATIVE_SCALED_WRITER_MAX_PARTITIONS_PER_WRITER = "native_scaled_writer_max_partitions_per_writer"; + public static final String NATIVE_SCALED_WRITER_MIN_PARTITION_PROCESSED_BYTES_REBALANCE_THRESHOLD = "native_scaled_writer_min_partition_processed_bytes_rebalance_threshold"; + public static final String NATIVE_SCALED_WRITER_MIN_PROCESSED_BYTES_REBALANCE_THRESHOLD = "native_scaled_writer_min_processed_bytes_rebalance_threshold"; private final List> sessionProperties; @Inject @@ -133,7 +138,7 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) longProperty( NATIVE_WRITER_FLUSH_THRESHOLD_BYTES, "Native Execution only. Minimum memory footprint size required to reclaim memory from a file " + - "writer by flushing its buffered data to disk.", + "writer by flushing its buffered data to disk.", 96L << 20, false), booleanProperty( @@ -276,6 +281,34 @@ public NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig) "Minimum number of rows to use prefix-sort. " + "The default value (130) has been derived using micro-benchmarking.", 130, + !nativeExecution), + doubleProperty( + NATIVE_SCALED_WRITER_REBALANCE_MAX_MEMORY_USAGE_RATIO, + "The max ratio of a query used memory to its max capacity, " + + "and the scale writer exchange stops scaling writer processing if the query's current " + + "memory usage exceeds this ratio. The value is in the range of (0, 1].", + 0.7, + !nativeExecution), + integerProperty( + NATIVE_SCALED_WRITER_MAX_PARTITIONS_PER_WRITER, + "The max number of logical table partitions that can be assigned to a " + + "single table writer thread. The logical table partition is used by local " + + "exchange writer for writer scaling, and multiple physical table " + + "partitions can be mapped to the same logical table partition based on the " + + "hash value of calculated partitioned ids", + 128, + !nativeExecution), + longProperty( + NATIVE_SCALED_WRITER_MIN_PARTITION_PROCESSED_BYTES_REBALANCE_THRESHOLD, + "Minimum amount of data processed by a logical table partition " + + "to trigger writer scaling if it is detected as overloaded by scale writer exchange.", + 128L << 20, + !nativeExecution), + longProperty( + NATIVE_SCALED_WRITER_MIN_PROCESSED_BYTES_REBALANCE_THRESHOLD, + "Minimum amount of data processed by all the logical table partitions " + + "to trigger skewed partition rebalancing by scale writer exchange.", + 256L << 20, !nativeExecution)); } diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index 1c50d2d91d95a..30ef49e3420b8 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -401,6 +401,47 @@ SessionProperties::SessionProperties() { false, QueryConfig::kPrefixSortMinRows, std::to_string(c.prefixSortMinRows())); + + addSessionProperty( + kScaleWriterRebalanceMaxMemoryUsageRatio, + "The max ratio of a query used memory to its max capacity, " + "and the scale writer exchange stops scaling writer processing if the query's current " + "memory usage exceeds this ratio. The value is in the range of (0, 1].", + DOUBLE(), + false, + QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + std::to_string(c.scaleWriterRebalanceMaxMemoryUsageRatio())); + + addSessionProperty( + kScaleWriterMaxPartitionsPerWriter, + "The max number of logical table partitions that can be assigned to a " + "single table writer thread. The logical table partition is used by local " + "exchange writer for writer scaling, and multiple physical table " + "partitions can be mapped to the same logical table partition based on the " + "hash value of calculated partitioned ids.", + INTEGER(), + false, + QueryConfig::kScaleWriterMaxPartitionsPerWriter, + std::to_string(c.scaleWriterMaxPartitionsPerWriter())); + + addSessionProperty( + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "Minimum amount of data processed by a logical table partition " + "to trigger writer scaling if it is detected as overloaded by scale writer exchange.", + BIGINT(), + false, + QueryConfig::kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + std::to_string( + c.scaleWriterMinPartitionProcessedBytesRebalanceThreshold())); + + addSessionProperty( + kScaleWriterMinProcessedBytesRebalanceThreshold, + "Minimum amount of data processed by all the logical table partitions " + "to trigger skewed partition rebalancing by scale writer exchange.", + BIGINT(), + false, + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, + std::to_string(c.scaleWriterMinProcessedBytesRebalanceThreshold())); } const std::unordered_map>& diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index ab6a36565dd6d..3156c66de7563 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -178,6 +178,31 @@ class SessionProperties { static constexpr const char* kSelectiveNimbleReaderEnabled = "native_selective_nimble_reader_enabled"; + /// The max ratio of a query used memory to its max capacity, and the scale + /// writer exchange stops scaling writer processing if the query's current + /// memory usage exceeds this ratio. The value is in the range of (0, 1]. + static constexpr const char* kScaleWriterRebalanceMaxMemoryUsageRatio = + "native_scaled_writer_rebalance_max_memory_usage_ratio"; + + /// The max number of logical table partitions that can be assigned to a + /// single table writer thread. The logical table partition is used by local + /// exchange writer for writer scaling, and multiple physical table + /// partitions can be mapped to the same logical table partition based on the + /// hash value of calculated partitioned ids. + static constexpr const char* kScaleWriterMaxPartitionsPerWriter = + "native_scaled_writer_max_partitions_per_writer"; + + /// Minimum amount of data processed by a logical table partition to trigger + /// writer scaling if it is detected as overloaded by scale writer exchange. + static constexpr const char* + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold = + "native_scaled_writer_min_partition_processed_bytes_rebalance_threshold"; + + /// Minimum amount of data processed by all the logical table partitions to + /// trigger skewed partition rebalancing by scale writer exchange. + static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = + "native_scaled_writer_min_processed_bytes_rebalance_threshold"; + /// Enable timezone-less timestamp conversions. static constexpr const char* kLegacyTimestamp = "legacy_timestamp"; @@ -246,7 +271,7 @@ class SessionProperties { /// prefix keys, which might have potential risk of running out of server /// memory. static constexpr const char* kSpillPrefixSortEnabled = - "spill_prefixsort_enabled"; + "native_spill_prefixsort_enabled"; /// Maximum number of bytes to use for the normalized key in prefix-sort. Use /// 0 to disable prefix-sort. diff --git a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp index 5da79aac1081f..6b6ad65334660 100644 --- a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp @@ -26,11 +26,21 @@ TEST_F(SessionPropertiesTest, validateMapping) { const std::vector names = { SessionProperties::kLegacyTimestamp, SessionProperties::kDriverCpuTimeSliceLimitMs, - SessionProperties::kSpillCompressionCodec}; + SessionProperties::kSpillCompressionCodec, + SessionProperties::kScaleWriterRebalanceMaxMemoryUsageRatio, + SessionProperties::kScaleWriterMaxPartitionsPerWriter, + SessionProperties:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + SessionProperties::kScaleWriterMinProcessedBytesRebalanceThreshold}; const std::vector veloxConfigNames = { core::QueryConfig::kAdjustTimestampToTimezone, core::QueryConfig::kDriverCpuTimeSliceLimitMs, - core::QueryConfig::kSpillCompressionKind}; + core::QueryConfig::kSpillCompressionKind, + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + core::QueryConfig::kScaleWriterMaxPartitionsPerWriter, + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + core::QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold}; auto sessionProperties = SessionProperties().getSessionProperties(); const auto len = names.size(); for (auto i = 0; i < len; i++) {