diff --git a/.github/workflows/experimental.yml b/.github/workflows/experimental.yml index 9c55c338f678..fbca400247ec 100644 --- a/.github/workflows/experimental.yml +++ b/.github/workflows/experimental.yml @@ -47,7 +47,7 @@ permissions: jobs: compile: - runs-on: 8-core-ubuntu + runs-on: 16-core-ubuntu timeout-minutes: 120 env: CCACHE_DIR: "${{ github.workspace }}/.ccache/" @@ -103,8 +103,20 @@ jobs: name: join path: velox/_build/debug/velox/exec/tests/velox_join_fuzzer_test + - name: Upload Presto expression fuzzer + uses: actions/upload-artifact@v3 + with: + name: presto_expression_fuzzer + path: velox/_build/debug/velox/expression/fuzzer/velox_expression_fuzzer_test + + - name: Upload Spark expression fuzzer + uses: actions/upload-artifact@v3 + with: + name: spark_expression_fuzzer + path: velox/_build/debug/velox/expression/fuzzer/spark_expression_fuzzer_test + presto-java-aggregation-fuzzer-run: - runs-on: 8-core-ubuntu + runs-on: 16-core-ubuntu container: ghcr.io/facebookincubator/velox-dev:presto-java timeout-minutes: 120 env: @@ -249,3 +261,96 @@ jobs: name: join-fuzzer-failure-artifacts path: | /tmp/join_fuzzer_repro + + presto-expression-fuzzer-run: + runs-on: ubuntu-latest + needs: compile + timeout-minutes: 120 + steps: + + - name: "Checkout Repo" + uses: actions/checkout@v3 + with: + ref: "${{ inputs.ref || 'main' }}" + + - name: "Install dependencies" + run: source ./scripts/setup-ubuntu.sh && install_apt_deps + + - name: Download presto fuzzer + uses: actions/download-artifact@v3 + with: + name: presto_expression_fuzzer + + - name: "Run Presto Fuzzer" + run: | + mkdir -p /tmp/presto_fuzzer_repro/ + rm -rfv /tmp/presto_fuzzer_repro/* + chmod -R 777 /tmp/presto_fuzzer_repro + chmod +x velox_expression_fuzzer_test + ./velox_expression_fuzzer_test \ + --seed ${RANDOM} \ + --enable_variadic_signatures \ + --velox_fuzzer_enable_complex_types \ + --velox_fuzzer_enable_decimal_type \ + --lazy_vector_generation_ratio 0.2 \ + --velox_fuzzer_enable_column_reuse \ + --velox_fuzzer_enable_expression_reuse \ + --max_expression_trees_per_step 2 \ + --retry_with_try \ + --enable_dereference \ + --duration_sec 1800 \ + --logtostderr=1 \ + --minloglevel=1 \ + --repro_persist_path=/tmp/presto_fuzzer_repro \ + && echo -e "\n\nFuzzer run finished successfully." + + - name: Archive Presto expression production artifacts + if: always() + uses: actions/upload-artifact@v3 + with: + name: presto-fuzzer-failure-artifacts + path: | + /tmp/presto_fuzzer_repro + + spark-expression-fuzzer-run: + runs-on: ubuntu-latest + needs: compile + timeout-minutes: 120 + steps: + + - name: "Checkout Repo" + uses: actions/checkout@v3 + with: + ref: "${{ inputs.ref || 'main' }}" + + - name: "Install dependencies" + run: source ./scripts/setup-ubuntu.sh && install_apt_deps + + - name: Download spark fuzzer + uses: actions/download-artifact@v3 + with: + name: spark_expression_fuzzer + + - name: "Run Spark Fuzzer" + run: | + mkdir -p /tmp/spark_fuzzer_repro/ + rm -rfv /tmp/spark_fuzzer_repro/* + chmod -R 777 /tmp/spark_fuzzer_repro + chmod +x spark_expression_fuzzer_test + ./spark_expression_fuzzer_test \ + --seed ${RANDOM} \ + --duration_sec 1800 \ + --logtostderr=1 \ + --minloglevel=1 \ + --repro_persist_path=/tmp/spark_fuzzer_repro \ + --velox_fuzzer_enable_decimal_type \ + --retry_with_try \ + && echo -e "\n\nSpark Fuzzer run finished successfully." + + - name: Archive Spark expression production artifacts + if: always() + uses: actions/upload-artifact@v3 + with: + name: spark-fuzzer-failure-artifacts + path: | + /tmp/spark_fuzzer_repro diff --git a/.github/workflows/linux-build.yml b/.github/workflows/linux-build.yml index 1ca5cce9c7ec..84171dfaee82 100644 --- a/.github/workflows/linux-build.yml +++ b/.github/workflows/linux-build.yml @@ -88,6 +88,22 @@ jobs: mv ./${MINIO_BINARY} /usr/local/bin/ fi + - name: Install Proxygen + run: | + FB_OS_VERSION="v2024.05.20.00" + PROXYGEN_BINARY="proxygen-${FB_OS_VERSION}.tar.gz" + if [ ! -f /usr/local/bin/${PROXYGEN_BINARY} ]; then + wget https://github.com/facebook/proxygen/archive/refs/tags/${FB_OS_VERSION}.tar.gz -O ${PROXYGEN_BINARY} + tar -xzf ${PROXYGEN_BINARY} + cd proxygen-${FB_OS_VERSION} + ./build.sh # Assumes there's a build script or replace with the appropriate build command + chmod +x ./proxygen-${FB_OS_VERSION} + mv ./proxygen-${FB_OS_VERSION} /usr/local/bin/ + cd .. + rm -rf proxygen-${FB_OS_VERSION} ${PROXYGEN_BINARY} + fi + + - uses: assignUser/stash/restore@v1 with: path: '${{ env.CCACHE_DIR }}' diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 63672ec7aa87..c56baff117c7 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -22,6 +22,7 @@ on: - "CMakeLists.txt" - "CMake/**" - "third_party/**" + - "scripts/setup-macos.sh" - ".github/workflows/macos.yml" pull_request: @@ -31,6 +32,7 @@ on: - "CMakeLists.txt" - "CMake/**" - "third_party/**" + - "scripts/setup-macos.sh" - ".github/workflows/macos.yml" permissions: diff --git a/CMake/resolve_dependency_modules/README.md b/CMake/resolve_dependency_modules/README.md index a164e50ad332..ce766cb17bcd 100644 --- a/CMake/resolve_dependency_modules/README.md +++ b/CMake/resolve_dependency_modules/README.md @@ -32,12 +32,11 @@ by Velox. See details on bundling below. | re2 | 2021-04-01 | Yes | | fmt | 10.1.1 | Yes | | simdjson | 3.9.3 | Yes | -| fast_float | v6.1.6 | Yes | -| folly | v2024.09.16.00 | Yes | -| fizz | v2024.09.16.00 | No | -| wangle | v2024.09.16.00 | No | -| mvfst | v2024.09.16.00 | No | -| fbthrift | v2024.09.16.00 | No | +| folly | v2024.05.20.00 | Yes | +| fizz | v2024.05.20.00 | No | +| wangle | v2024.05.20.00 | No | +| mvfst | v2024.05.20.00 | No | +| fbthrift | v2024.05.20.00 | No | | libstemmer | 2.2.0 | Yes | | DuckDB (testing) | 0.8.1 | Yes | | cpr (testing) | 1.10.15 | Yes | diff --git a/CMake/resolve_dependency_modules/fast_float.cmake b/CMake/resolve_dependency_modules/fast_float.cmake deleted file mode 100644 index 36772470146a..000000000000 --- a/CMake/resolve_dependency_modules/fast_float.cmake +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) Facebook, Inc. and its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -include_guard(GLOBAL) - -set(VELOX_FAST_FLOAT_VERSION 6.1.6) -set(VELOX_FAST_FLOAT_BUILD_SHA256_CHECKSUM - 4458aae4b0eb55717968edda42987cabf5f7fc737aee8fede87a70035dba9ab0) -set(VELOX_FAST_FLOAT_SOURCE_URL - "https://github.com/fastfloat/fast_float/archive/refs/tags/v${VELOX_FAST_FLOAT_VERSION}.tar.gz" -) - -resolve_dependency_url(FAST_FLOAT) - -message(STATUS "Building fast_float from source") -FetchContent_Declare( - fast_float - URL ${VELOX_FAST_FLOAT_SOURCE_URL} - URL_HASH ${VELOX_FAST_FLOAT_BUILD_SHA256_CHECKSUM}) - -FetchContent_MakeAvailable(fast_float) -# Folly searches for the header path directly so need to make sure to search in -# the dependency path. -list(APPEND CMAKE_PREFIX_PATH "${fast_float_SOURCE_DIR}") diff --git a/CMake/resolve_dependency_modules/folly/CMakeLists.txt b/CMake/resolve_dependency_modules/folly/CMakeLists.txt index 6479e3613bea..df7a489384ee 100644 --- a/CMake/resolve_dependency_modules/folly/CMakeLists.txt +++ b/CMake/resolve_dependency_modules/folly/CMakeLists.txt @@ -14,15 +14,13 @@ project(Folly) cmake_minimum_required(VERSION 3.14) -set(VELOX_FOLLY_BUILD_VERSION v2024.09.16.00) +set(VELOX_FOLLY_BUILD_VERSION v2024.05.20.00) set(VELOX_FOLLY_BUILD_SHA256_CHECKSUM - 0a375f2f3e15a2679b4d21fa1064986830a52f59c74d82b3bda1aeeea4e77da0) + f4a450e59f0b74d1b0b4e5c55ae38e820166c95d02f8a8a298e54a49a90aa057) set(VELOX_FOLLY_SOURCE_URL "https://github.com/facebook/folly/releases/download/${VELOX_FOLLY_BUILD_VERSION}/folly-${VELOX_FOLLY_BUILD_VERSION}.tar.gz" ) -set(fast_float_SOURCE BUNDLED) -resolve_dependency(fast_float) resolve_dependency_url(FOLLY) message(STATUS "Building Folly from source") diff --git a/CMake/resolve_dependency_modules/folly/folly-gflags-glog.patch b/CMake/resolve_dependency_modules/folly/folly-gflags-glog.patch index a33098900042..6ef25f5ac231 100644 --- a/CMake/resolve_dependency_modules/folly/folly-gflags-glog.patch +++ b/CMake/resolve_dependency_modules/folly/folly-gflags-glog.patch @@ -13,10 +13,10 @@ # limitations under the License. --- a/CMake/folly-deps.cmake +++ b/CMake/folly-deps.cmake -@@ -55,19 +55,23 @@ list(APPEND FOLLY_INCLUDE_DIRECTORIES ${DOUBLE_CONVERSION_INCLUDE_DIR}) - find_package(FastFloat MODULE REQUIRED) - list(APPEND FOLLY_INCLUDE_DIRECTORIES ${FASTFLOAT_INCLUDE_DIR}) - +@@ -52,19 +52,20 @@ find_package(DoubleConversion MODULE REQUIRED) + list(APPEND FOLLY_LINK_LIBRARIES ${DOUBLE_CONVERSION_LIBRARY}) + list(APPEND FOLLY_INCLUDE_DIRECTORIES ${DOUBLE_CONVERSION_INCLUDE_DIR}) + -find_package(Gflags MODULE) -set(FOLLY_HAVE_LIBGFLAGS ${LIBGFLAGS_FOUND}) -if(LIBGFLAGS_FOUND) @@ -27,14 +27,12 @@ +find_package(gflags) +set(FOLLY_HAVE_LIBGFLAGS ${gflags_FOUND}) +if(gflags_FOUND) -+ list(APPEND FOLLY_LINK_LIBRARIES ${gflags_LIBRARY}) -+ list(APPEND FOLLY_INCLUDE_DIRECTORIES ${gflags_INCLUDE_DIR}) -+ set(FOLLY_LIBGFLAGS_LIBRARY ${gflags_LIBRARY}) -+ set(FOLLY_LIBGFLAGS_INCLUDE ${gflags_INCLUDE_DIR}) -+ message(STATUS "gflags_INCLUDE_DIR: ${gflags_INCLUDE_DIR}") -+ message(STATUS "gflags_LIBRARY: ${gflags_LIBRARY}") ++ list(APPEND FOLLY_LINK_LIBRARIES ${gflags_LIBRARY}) ++ list(APPEND FOLLY_INCLUDE_DIRECTORIES ${gflags_INCLUDE_DIR}) ++ set(FOLLY_LIBGFLAGS_LIBRARY ${gflags_LIBRARY}) ++ set(FOLLY_LIBGFLAGS_INCLUDE ${gflags_INCLUDE_DIR}) endif() - + -find_package(Glog MODULE) -set(FOLLY_HAVE_LIBGLOG ${GLOG_FOUND}) -list(APPEND FOLLY_LINK_LIBRARIES ${GLOG_LIBRARY}) @@ -43,8 +41,7 @@ +set(FOLLY_HAVE_LIBGLOG ${glog_FOUND}) +list(APPEND FOLLY_LINK_LIBRARIES ${glog_LIBRARY}) +list(APPEND FOLLY_INCLUDE_DIRECTORIES ${glog_INCLUDE_DIR}) -+message(STATUS "glog_INCLUDE_DIR: ${glog_INCLUDE_DIR}") -+message(STATUS "glog_LIBRARY: ${glog_LIBRARY}") - ++message(STATUS "glog_INCLUDE_DIR: ${gflags_LINRARY}") + find_package(LibEvent MODULE REQUIRED) list(APPEND FOLLY_LINK_LIBRARIES ${LIBEVENT_LIB}) diff --git a/CMakeLists.txt b/CMakeLists.txt index 70e50e77782e..07dee95e44d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -487,6 +487,12 @@ if(${VELOX_BUILD_TESTING}) resolve_dependency(gRPC) endif() +if(VELOX_ENABLE_REMOTE_FUNCTIONS) + find_package(fizz CONFIG REQUIRED) + find_package(wangle CONFIG REQUIRED) + find_package(proxygen CONFIG REQUIRED) +endif() + if(VELOX_ENABLE_REMOTE_FUNCTIONS) # TODO: Move this to use resolve_dependency(). For some reason, FBThrift # requires clients to explicitly install fizz and wangle. diff --git a/scripts/setup-centos9.sh b/scripts/setup-centos9.sh index 1efb8e53ec34..2914be251f58 100755 --- a/scripts/setup-centos9.sh +++ b/scripts/setup-centos9.sh @@ -39,11 +39,11 @@ USE_CLANG="${USE_CLANG:-false}" export INSTALL_PREFIX=${INSTALL_PREFIX:-"/usr/local"} DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)/deps-download} -FB_OS_VERSION="v2024.09.16.00" +FB_OS_VERSION="v2024.05.20.00" FMT_VERSION="10.1.1" BOOST_VERSION="boost-1.84.0" ARROW_VERSION="15.0.0" -FAST_FLOAT_VERSION="v6.1.6" +STEMMER_VERSION="2.2.0" function dnf_install { dnf install -y -q --setopt=install_weak_deps=False "$@" @@ -178,6 +178,17 @@ function install_duckdb { fi } +function install_stemmer { + wget_and_untar https://snowballstem.org/dist/libstemmer_c-${STEMMER_VERSION}.tar.gz stemmer + ( + cd ${DEPENDENCY_DIR}/stemmer + sed -i '/CPPFLAGS=-Iinclude/ s/$/ -fPIC/' Makefile + make clean && make "-j${NPROC}" + ${SUDO} cp libstemmer.a ${INSTALL_PREFIX}/lib/ + ${SUDO} cp include/libstemmer.h ${INSTALL_PREFIX}/include/ + ) +} + function install_arrow { wget_and_untar https://archive.apache.org/dist/arrow/arrow-${ARROW_VERSION}/apache-arrow-${ARROW_VERSION}.tar.gz arrow cmake_install_dir arrow/cpp \ @@ -210,12 +221,6 @@ function install_cuda { dnf install -y cuda-nvcc-$(echo $1 | tr '.' '-') cuda-cudart-devel-$(echo $1 | tr '.' '-') } -function install_fast_float { - # Dependency of folly. - wget_and_untar https://github.com/fastfloat/fast_float/archive/refs/tags/${FAST_FLOAT_VERSION}.tar.gz fast_float - cmake_install_dir fast_float -} - function install_velox_deps { run_and_time install_velox_deps_from_dnf run_and_time install_conda @@ -226,13 +231,13 @@ function install_velox_deps { run_and_time install_boost run_and_time install_protobuf run_and_time install_fmt - run_and_time install_fast_float run_and_time install_folly run_and_time install_fizz run_and_time install_wangle run_and_time install_mvfst run_and_time install_fbthrift run_and_time install_duckdb + run_and_time install_stemmer run_and_time install_arrow } diff --git a/scripts/setup-macos.sh b/scripts/setup-macos.sh index b394e8ce4755..45d621abb944 100755 --- a/scripts/setup-macos.sh +++ b/scripts/setup-macos.sh @@ -42,9 +42,10 @@ BUILD_DUCKDB="${BUILD_DUCKDB:-true}" DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)} MACOS_VELOX_DEPS="bison flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 snappy xz zstd" MACOS_BUILD_DEPS="ninja cmake" -FB_OS_VERSION="v2024.09.16.00" +FB_OS_VERSION="v2024.05.20.00" FMT_VERSION="10.1.1" FAST_FLOAT_VERSION="v6.1.6" +STEMMER_VERSION="2.2.0" function update_brew { DEFAULT_BREW_PATH=/usr/local/bin/brew @@ -143,12 +144,6 @@ function install_re2 { cmake_install_dir re2 -DRE2_BUILD_TESTING=OFF } -function install_fast_float { - # Dependency of folly. - wget_and_untar https://github.com/fastfloat/fast_float/archive/refs/tags/${FAST_FLOAT_VERSION}.tar.gz fast_float - cmake_install_dir fast_float -} - function install_duckdb { if $BUILD_DUCKDB ; then echo 'Building DuckDB' @@ -157,19 +152,30 @@ function install_duckdb { fi } +function install_stemmer { + wget_and_untar https://snowballstem.org/dist/libstemmer_c-${STEMMER_VERSION}.tar.gz stemmer + ( + cd ${DEPENDENCY_DIR}/stemmer + sed -i '/CPPFLAGS=-Iinclude/ s/$/ -fPIC/' Makefile + make clean && make "-j${NPROC}" + ${SUDO} cp libstemmer.a ${INSTALL_PREFIX}/lib/ + ${SUDO} cp include/libstemmer.h ${INSTALL_PREFIX}/include/ + ) +} + function install_velox_deps { run_and_time install_velox_deps_from_brew run_and_time install_ranges_v3 run_and_time install_double_conversion run_and_time install_re2 run_and_time install_fmt - run_and_time install_fast_float run_and_time install_folly run_and_time install_fizz run_and_time install_wangle run_and_time install_mvfst run_and_time install_fbthrift run_and_time install_duckdb + run_and_time install_stemmer } (return 2> /dev/null) && return # If script was sourced, don't run commands. diff --git a/scripts/setup-ubuntu.sh b/scripts/setup-ubuntu.sh index c4785af00fe5..5ecf27235652 100755 --- a/scripts/setup-ubuntu.sh +++ b/scripts/setup-ubuntu.sh @@ -54,11 +54,11 @@ function install_clang15 { ${SUDO} apt install ${CLANG_PACKAGE_LIST} -y } -FB_OS_VERSION="v2024.09.16.00" +FB_OS_VERSION="v2024.05.20.00" FMT_VERSION="10.1.1" BOOST_VERSION="boost-1.84.0" ARROW_VERSION="15.0.0" -FAST_FLOAT_VERSION="v6.1.6" +STEMMER_VERSION="2.2.0" # Install packages required for build. function install_build_prerequisites { @@ -186,6 +186,17 @@ function install_duckdb { fi } +function install_stemmer { + wget_and_untar https://snowballstem.org/dist/libstemmer_c-${STEMMER_VERSION}.tar.gz stemmer + ( + cd ${DEPENDENCY_DIR}/stemmer + sed -i '/CPPFLAGS=-Iinclude/ s/$/ -fPIC/' Makefile + make clean && make "-j${NPROC}" + ${SUDO} cp libstemmer.a ${INSTALL_PREFIX}/lib/ + ${SUDO} cp include/libstemmer.h ${INSTALL_PREFIX}/include/ + ) +} + function install_arrow { wget_and_untar https://archive.apache.org/dist/arrow/arrow-${ARROW_VERSION}/apache-arrow-${ARROW_VERSION}.tar.gz arrow cmake_install_dir arrow/cpp \ @@ -223,17 +234,10 @@ function install_cuda { $SUDO apt install -y cuda-nvcc-$(echo $1 | tr '.' '-') cuda-cudart-dev-$(echo $1 | tr '.' '-') } -function install_fast_float { - # Dependency of folly. - wget_and_untar https://github.com/fastfloat/fast_float/archive/refs/tags/${FAST_FLOAT_VERSION}.tar.gz fast_float - cmake_install_dir fast_float -} - function install_velox_deps { run_and_time install_velox_deps_from_apt run_and_time install_fmt run_and_time install_boost - run_and_time install_fast_float run_and_time install_folly run_and_time install_fizz run_and_time install_wangle @@ -241,6 +245,7 @@ function install_velox_deps { run_and_time install_fbthrift run_and_time install_conda run_and_time install_duckdb + run_and_time install_stemmer run_and_time install_arrow } diff --git a/velox/common/config/CMakeLists.txt b/velox/common/config/CMakeLists.txt index 7780665a2925..9639a2c8b6f7 100644 --- a/velox/common/config/CMakeLists.txt +++ b/velox/common/config/CMakeLists.txt @@ -12,13 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -if (${VELOX_BUILD_TESTING}) +if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) -endif () +endif() velox_add_library(velox_common_config Config.cpp) velox_link_libraries( velox_common_config - PUBLIC velox_common_base - velox_exception + PUBLIC velox_common_base velox_exception PRIVATE re2::re2) diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index beaa9b8cfe43..fd45ec232344 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -109,16 +109,6 @@ uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity( config::CapacityUnit::BYTE); } -uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity( - const std::unordered_map& configs) { - return config::toCapacity( - getConfig( - configs, - kMemoryPoolTransferCapacity, - std::string(kDefaultMemoryPoolTransferCapacity)), - config::CapacityUnit::BYTE); -} - uint64_t SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs( const std::unordered_map& configs) { return std::chrono::duration_cast( @@ -182,8 +172,6 @@ SharedArbitrator::SharedArbitrator(const Config& config) ExtraConfig::getMemoryPoolInitialCapacity(config.extraConfigs)), memoryPoolReservedCapacity_( ExtraConfig::getMemoryPoolReservedCapacity(config.extraConfigs)), - memoryPoolTransferCapacity_( - ExtraConfig::getMemoryPoolTransferCapacity(config.extraConfigs)), memoryReclaimWaitMs_( ExtraConfig::getMemoryReclaimMaxWaitTimeMs(config.extraConfigs)), globalArbitrationEnabled_( @@ -542,7 +530,7 @@ uint64_t SharedArbitrator::getCapacityGrowthTarget( const MemoryPool& pool, uint64_t requestBytes) const { if (fastExponentialGrowthCapacityLimit_ == 0 && slowCapacityGrowPct_ == 0) { - return std::max(requestBytes, memoryPoolTransferCapacity_); + return requestBytes; } uint64_t targetBytes{0}; const auto capacity = pool.capacity(); @@ -551,8 +539,7 @@ uint64_t SharedArbitrator::getCapacityGrowthTarget( } else { targetBytes = capacity * slowCapacityGrowPct_; } - return std::max( - std::max(requestBytes, targetBytes), memoryPoolTransferCapacity_); + return std::max(requestBytes, targetBytes); } bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) { @@ -938,9 +925,8 @@ uint64_t SharedArbitrator::reclaim( MemoryPool* pool, uint64_t targetBytes, bool isLocalArbitration) noexcept { - int64_t bytesToReclaim = std::min( - std::max(targetBytes, memoryPoolTransferCapacity_), - maxReclaimableCapacity(*pool, true)); + int64_t bytesToReclaim = + std::min(targetBytes, maxReclaimableCapacity(*pool, true)); if (bytesToReclaim == 0) { return 0; } diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 6c784f5373f2..a8af9ca7b92b 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -61,15 +61,6 @@ class SharedArbitrator : public memory::MemoryArbitrator { static uint64_t getMemoryPoolReservedCapacity( const std::unordered_map& configs); - /// The minimal memory capacity to transfer out of or into a memory pool - /// during the memory arbitration. - static constexpr std::string_view kMemoryPoolTransferCapacity{ - "memory-pool-transfer-capacity"}; - static constexpr std::string_view kDefaultMemoryPoolTransferCapacity{ - "128MB"}; - static uint64_t getMemoryPoolTransferCapacity( - const std::unordered_map& configs); - /// Specifies the max time to wait for memory reclaim by arbitration. The /// memory reclaim might fail if the max time has exceeded. This prevents /// the memory arbitration from getting stuck when the memory reclaim waits @@ -231,11 +222,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { // The adjusted grow bytes based on 'requestBytes'. This 'targetBytes' is a // best effort target, and hence will not be guaranteed. The adjustment is // based on 'SharedArbitrator::fastExponentialGrowthCapacityLimit_' - // 'SharedArbitrator::slowCapacityGrowPct_' and - // 'MemoryArbitrator::memoryPoolTransferCapacity_'. - // - // TODO: deprecate 'MemoryArbitrator::memoryPoolTransferCapacity_' once - // exponential growth works well in production. + // 'SharedArbitrator::slowCapacityGrowPct_' const std::optional targetBytes; // The start time of this arbitration operation. @@ -516,7 +503,6 @@ class SharedArbitrator : public memory::MemoryArbitrator { const uint64_t reservedCapacity_; const uint64_t memoryPoolInitialCapacity_; const uint64_t memoryPoolReservedCapacity_; - const uint64_t memoryPoolTransferCapacity_; const uint64_t memoryReclaimWaitMs_; const bool globalArbitrationEnabled_; const bool checkUsageLeak_; diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 61a956d7dde6..2e71aa4390a0 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -144,6 +144,7 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { ASSERT_FALSE(manager.arbitrator()->growCapacity(rootPool.get(), 6 << 20)); ASSERT_EQ(rootPool->capacity(), 1 << 20); ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 2 << 20)); + ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 1 << 20)); ASSERT_EQ(rootPool->capacity(), 4 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeCapacityBytes, 2 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeReservedCapacityBytes, 2 << 20); @@ -154,19 +155,19 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { "Exceeded memory pool capacity after attempt to grow capacity through " "arbitration. Requestor pool name 'leaf-1.0', request size 7.00MB, " "memory pool capacity 4.00MB, memory pool max capacity 8.00MB"); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 1 << 20); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 1), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 1), 0); - ASSERT_EQ(rootPool->capacity(), 3 << 20); + ASSERT_EQ(rootPool->capacity(), 4 << 20); static_cast(rootPool.get())->testingSetReservation(0); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(leafPool.get(), 1 << 20), 1 << 20); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(rootPool.get(), 1 << 20), 1 << 20); - ASSERT_EQ(rootPool->capacity(), 1 << 20); - ASSERT_EQ(leafPool->capacity(), 1 << 20); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 1 << 20); + ASSERT_EQ(rootPool->capacity(), 2 << 20); + ASSERT_EQ(leafPool->capacity(), 2 << 20); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 2 << 20); ASSERT_EQ(rootPool->capacity(), 0); ASSERT_EQ(leafPool->capacity(), 0); } diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index cc32ef111791..320aa892fc53 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -65,8 +65,6 @@ constexpr int64_t MB = 1024L * KB; constexpr uint64_t kMemoryCapacity = 512 * MB; constexpr uint64_t kReservedMemoryCapacity = 128 * MB; constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB; -// TODO(jtan6): Remove after complete transfer capacity deprecation -constexpr uint64_t kMemoryPoolTransferCapacity = 0; constexpr uint64_t kMemoryPoolReservedCapacity = 8 * MB; constexpr uint64_t kFastExponentialGrowthCapacityLimit = 32 * MB; constexpr double kSlowCapacityGrowPct = 0.25; @@ -429,7 +427,6 @@ class MockSharedArbitrationTest : public testing::Test { int64_t reservedMemoryCapacity = kReservedMemoryCapacity, uint64_t memoryPoolInitCapacity = kMemoryPoolInitCapacity, uint64_t memoryPoolReserveCapacity = kMemoryPoolReservedCapacity, - uint64_t memoryPoolTransferCapacity = kMemoryPoolTransferCapacity, uint64_t fastExponentialGrowthCapacityLimit = kFastExponentialGrowthCapacityLimit, double slowCapacityGrowPct = kSlowCapacityGrowPct, @@ -450,8 +447,6 @@ class MockSharedArbitrationTest : public testing::Test { folly::to(memoryPoolInitCapacity) + "B"}, {std::string(ExtraConfig::kMemoryPoolReservedCapacity), folly::to(memoryPoolReserveCapacity) + "B"}, - {std::string(ExtraConfig::kMemoryPoolTransferCapacity), - folly::to(memoryPoolTransferCapacity) + "B"}, {std::string(ExtraConfig::kFastExponentialGrowthCapacityLimit), folly::to(fastExponentialGrowthCapacityLimit) + "B"}, {std::string(ExtraConfig::kSlowCapacityGrowPct), @@ -555,10 +550,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(emptyConfigs), 256 << 20); - ASSERT_EQ( - SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity( - emptyConfigs), - 128 << 20); ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs( emptyConfigs), @@ -578,8 +569,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "512MB"; configs[std::string( SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "200B"; - configs[std::string( - SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "256MB"; configs[std::string( SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "5000ms"; configs[std::string( @@ -593,9 +582,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs), 200); - ASSERT_EQ( - SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs), - 256 << 20); ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs), 5000); @@ -610,8 +596,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "invalid"; configs[std::string( SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "invalid"; - configs[std::string( - SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "invalid"; configs[std::string( SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "invalid"; configs[std::string( @@ -627,9 +611,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs), "Invalid capacity string 'invalid'"); - VELOX_ASSERT_THROW( - SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs), - "Invalid capacity string 'invalid'"); VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs), "Invalid duration 'invalid'"); @@ -678,7 +659,7 @@ TEST_F(MockSharedArbitrationTest, arbitrationStateCheck) { ASSERT_TRUE(RE2::FullMatch(pool.name(), re)) << pool.name(); ++checkCount; }; - setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, 0, checkCountCb); + setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, checkCountCb); const int numTasks{5}; std::vector> tasks; @@ -703,7 +684,7 @@ TEST_F(MockSharedArbitrationTest, arbitrationStateCheck) { MemoryArbitrationStateCheckCB badCheckCb = [&](MemoryPool& /*unused*/) { VELOX_FAIL("bad check"); }; - setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, 0, badCheckCb); + setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, badCheckCb); std::shared_ptr task = addTask(kMemoryCapacity); ASSERT_EQ(task->capacity(), 0); MockMemoryOperator* memOp = task->addMemoryOp(); @@ -1238,7 +1219,6 @@ DEBUG_ONLY_TEST_F( 0, memoryPoolInitCapacity, 0, - 0, kFastExponentialGrowthCapacityLimit, kSlowCapacityGrowPct, 0, @@ -1331,7 +1311,6 @@ DEBUG_ONLY_TEST_F( 0, memoryPoolInitCapacity, 0, - 0, kFastExponentialGrowthCapacityLimit, kSlowCapacityGrowPct, 0, @@ -1615,7 +1594,6 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationEnableCheck) { 0, memoryPoolInitCapacity, 0, - 0, kFastExponentialGrowthCapacityLimit, kSlowCapacityGrowPct, kMemoryPoolMinFreeCapacity, @@ -1652,7 +1630,6 @@ DEBUG_ONLY_TEST_F( 0, memoryPoolInitCapacity, 0, - 0, kFastExponentialGrowthCapacityLimit, kSlowCapacityGrowPct, 0, @@ -1871,7 +1848,6 @@ TEST_F(MockSharedArbitrationTest, singlePoolShrinkWithoutArbitration) { 0, 0, 0, - 0, testParam.memoryPoolMinFreeCapacity, testParam.memoryPoolMinFreeCapacityPct), "both need to be set (non-zero) at the same time to enable shrink " @@ -1885,7 +1861,6 @@ TEST_F(MockSharedArbitrationTest, singlePoolShrinkWithoutArbitration) { 0, 0, 0, - 0, testParam.memoryPoolMinFreeCapacity, testParam.memoryPoolMinFreeCapacityPct); } @@ -1931,7 +1906,6 @@ TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) { 0, memoryPoolInitCapacity, 0, - 0, testParam.fastExponentialGrowthCapacityLimit, testParam.slowCapacityGrowPct); @@ -2146,7 +2120,6 @@ TEST_F(MockSharedArbitrationTest, ensureMemoryPoolMaxCapacity) { 0, poolInitCapacity, 0, - 0, kFastExponentialGrowthCapacityLimit, kSlowCapacityGrowPct, 0, @@ -2681,7 +2654,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, failedToReclaimFromRequestor) { 0}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); - setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0, 0); + setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0); std::vector> otherTasks; std::vector otherTaskOps; @@ -2865,7 +2838,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, failedToReclaimFromOtherTask) { nonFailTaskMemoryCapacity}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); - setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0, 0); + setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0); std::vector> nonFailedTasks; std::vector nonFailedTaskOps; @@ -3015,7 +2988,6 @@ TEST_F(MockSharedArbitrationTest, memoryPoolAbortThrow) { 0, kMemoryPoolInitCapacity, 0, - 0, kFastExponentialGrowthCapacityLimit, kSlowCapacityGrowPct, 0, @@ -3065,7 +3037,7 @@ TEST_F(MockSharedArbitrationTest, memoryPoolAbortThrow) { // This test makes sure the memory capacity grows as expected. DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, concurrentArbitrationRequests) { - setupMemory(kMemoryCapacity, 0, 0, 0, 128 << 20); + setupMemory(kMemoryCapacity, 0, 0, 0); std::shared_ptr task = addTask(); MockMemoryOperator* op1 = addMemoryOp(task); MockMemoryOperator* op2 = addMemoryOp(task); diff --git a/velox/common/time/CpuWallTimer.cpp b/velox/common/time/CpuWallTimer.cpp index 7a1e137ba91f..e21944b05d1c 100644 --- a/velox/common/time/CpuWallTimer.cpp +++ b/velox/common/time/CpuWallTimer.cpp @@ -17,6 +17,7 @@ #include "velox/common/time/CpuWallTimer.h" namespace facebook::velox { + CpuWallTimer::CpuWallTimer(CpuWallTiming& timing) : timing_(timing) { ++timing_.count; cpuTimeStart_ = process::threadCpuNanos(); @@ -29,4 +30,5 @@ CpuWallTimer::~CpuWallTimer() { std::chrono::steady_clock::now() - wallTimeStart_); timing_.wallNanos += duration.count(); } + } // namespace facebook::velox diff --git a/velox/common/time/CpuWallTimer.h b/velox/common/time/CpuWallTimer.h index f60f23c19dc9..6562364a2942 100644 --- a/velox/common/time/CpuWallTimer.h +++ b/velox/common/time/CpuWallTimer.h @@ -57,19 +57,14 @@ class CpuWallTimer { CpuWallTiming& timing_; }; -// Keeps track of elapsed CPU and wall time from construction time. -// Composes delta CpuWallTiming upon destruction and passes it to the user -// callback, where it can be added to the user's CpuWallTiming using -// CpuWallTiming::add(). -template -class DeltaCpuWallTimer { +/// Keeps track of elapsed CPU and wall time from construction time. +class DeltaCpuWallTimeStopWatch { public: - explicit DeltaCpuWallTimer(F&& func) + explicit DeltaCpuWallTimeStopWatch() : wallTimeStart_(std::chrono::steady_clock::now()), - cpuTimeStart_(process::threadCpuNanos()), - func_(std::move(func)) {} + cpuTimeStart_(process::threadCpuNanos()) {} - ~DeltaCpuWallTimer() { + CpuWallTiming elapsed() const { // NOTE: End the cpu-time timing first, and then end the wall-time timing, // so as to avoid the counter-intuitive phenomenon that the final calculated // cpu-time is slightly larger than the wall-time. @@ -78,8 +73,7 @@ class DeltaCpuWallTimer { std::chrono::duration_cast( std::chrono::steady_clock::now() - wallTimeStart_) .count(); - const CpuWallTiming deltaTiming{1, wallTimeDuration, cpuTimeDuration}; - func_(deltaTiming); + return CpuWallTiming{1, wallTimeDuration, cpuTimeDuration}; } private: @@ -87,6 +81,22 @@ class DeltaCpuWallTimer { // counting earlier than cpu-time. const std::chrono::steady_clock::time_point wallTimeStart_; const uint64_t cpuTimeStart_; +}; + +/// Composes delta CpuWallTiming upon destruction and passes it to the user +/// callback, where it can be added to the user's CpuWallTiming using +/// CpuWallTiming::add(). +template +class DeltaCpuWallTimer { + public: + explicit DeltaCpuWallTimer(F&& func) : func_(std::move(func)) {} + + ~DeltaCpuWallTimer() { + func_(timer_.elapsed()); + } + + private: + DeltaCpuWallTimeStopWatch timer_; F func_; }; diff --git a/velox/docs/develop/memory.rst b/velox/docs/develop/memory.rst index c5ed4a63fe27..06ff5fe667ed 100644 --- a/velox/docs/develop/memory.rst +++ b/velox/docs/develop/memory.rst @@ -610,10 +610,7 @@ The end-to-end memory arbitration process in *SharedArbitrator* works as follows memory reservations from the candidate query pools without actually freeing the used memory. It first tries to reclaim from itself and then from the candidate pools which have the most free capacity - (*MemoryPool::freeBytes*) until it reaches the memory reclaim target. Note - that we set the memory reclaim target to a large value - (*MemoryManagerOptions::memoryPoolTransferCapacity*) which could be more - than the actual needed size, to avoid the frequent memory arbitrations. + (*MemoryPool::freeBytes*) until it reaches the memory reclaim target. d. If the memory arbitrator hasn’t reclaimed enough free memory on fast path, it runs the slow path diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 307ba0d32370..2e6794e43b18 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -95,6 +95,10 @@ const core::QueryConfig& DriverCtx::queryConfig() const { return task->queryCtx()->queryConfig(); } +const std::optional& DriverCtx::traceConfig() const { + return task->queryTraceConfig(); +} + velox::memory::MemoryPool* DriverCtx::addOperatorPool( const core::PlanNodeId& planNodeId, const std::string& operatorType) { @@ -348,7 +352,7 @@ void Driver::enqueueInternal() { queueTimeStartUs_ = getCurrentTimeMicro(); } -// Call an Oprator method. record silenced throws, but not a query +// Call an Operator method. record silenced throws, but not a query // terminating throw. Annotate exceptions with Operator info. #define CALL_OPERATOR(call, operatorPtr, operatorId, operatorMethod) \ try { \ @@ -395,20 +399,22 @@ size_t OpCallStatusRaw::callDuration() const { : fmt::format("null::{}", operatorMethod); } -CpuWallTiming Driver::processLazyTiming( +CpuWallTiming Driver::processLazyIoStats( Operator& op, const CpuWallTiming& timing) { if (&op == operators_[0].get()) { return timing; } auto lockStats = op.stats().wlock(); + + // Checks and tries to update cpu time from lazy loads. auto it = lockStats->runtimeStats.find(LazyVector::kCpuNanos); if (it == lockStats->runtimeStats.end()) { // Return early if no lazy activity. Lazy CPU and wall times are recorded // together, checking one is enough. return timing; } - int64_t cpu = it->second.sum; + const int64_t cpu = it->second.sum; auto cpuDelta = std::max(0, cpu - lockStats->lastLazyCpuNanos); if (cpuDelta == 0) { // Return early if no change. Checking one counter is enough. If this did @@ -417,15 +423,29 @@ CpuWallTiming Driver::processLazyTiming( return timing; } lockStats->lastLazyCpuNanos = cpu; + + // Checks and tries to update wall time from lazy loads. int64_t wallDelta = 0; it = lockStats->runtimeStats.find(LazyVector::kWallNanos); if (it != lockStats->runtimeStats.end()) { - int64_t wall = it->second.sum; + const int64_t wall = it->second.sum; wallDelta = std::max(0, wall - lockStats->lastLazyWallNanos); if (wallDelta > 0) { lockStats->lastLazyWallNanos = wall; } } + + // Checks and tries to update input bytes from lazy loads. + int64_t inputBytesDelta = 0; + it = lockStats->runtimeStats.find(LazyVector::kInputBytes); + if (it != lockStats->runtimeStats.end()) { + const int64_t inputBytes = it->second.sum; + inputBytesDelta = inputBytes - lockStats->lastLazyInputBytes; + if (inputBytesDelta > 0) { + lockStats->lastLazyInputBytes = inputBytes; + } + } + lockStats.unlock(); cpuDelta = std::min(cpuDelta, timing.cpuNanos); wallDelta = std::min(wallDelta, timing.wallNanos); @@ -435,6 +455,8 @@ CpuWallTiming Driver::processLazyTiming( static_cast(wallDelta), static_cast(cpuDelta), }); + lockStats->inputBytes += inputBytesDelta; + lockStats->outputBytes += inputBytesDelta; return CpuWallTiming{ 1, timing.wallNanos - wallDelta, @@ -1012,7 +1034,7 @@ void Driver::withDeltaCpuWallTimer( // opTimingMember upon destruction of the timer when withDeltaCpuWallTimer // ends. The timer is created on the stack to avoid heap allocation auto f = [op, opTimingMember, this](const CpuWallTiming& elapsedTime) { - auto elapsedSelfTime = processLazyTiming(*op, elapsedTime); + auto elapsedSelfTime = processLazyIoStats(*op, elapsedTime); op->stats().withWLock([&](auto& lockedStats) { (lockedStats.*opTimingMember).add(elapsedSelfTime); }); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7551b2b304d7..a1f9d087321d 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once #include @@ -26,6 +27,7 @@ #include "velox/common/time/CpuWallTimer.h" #include "velox/core/PlanFragment.h" #include "velox/core/QueryCtx.h" +#include "velox/exec/trace/QueryTraceConfig.h" namespace facebook::velox::exec { @@ -285,6 +287,8 @@ struct DriverCtx { const core::QueryConfig& queryConfig() const; + const std::optional& traceConfig() const; + velox::memory::MemoryPool* addOperatorPool( const core::PlanNodeId& planNodeId, const std::string& operatorType); @@ -528,13 +532,12 @@ class Driver : public std::enable_shared_from_this { TimingMemberPtr opTimingMember, Func&& opFunction); - // Adjusts 'timing' by removing the lazy load wall and CPU times - // accrued since last time timing information was recorded for - // 'op'. The accrued lazy load times are credited to the source - // operator of 'this'. The per-operator runtimeStats for lazy load - // are left in place to reflect which operator triggered the load - // but these do not bias the op's timing. - CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing); + // Adjusts 'timing' by removing the lazy load wall time, CPU time, and input + // bytes accrued since last time timing information was recorded for 'op'. The + // accrued lazy load times are credited to the source operator of 'this'. The + // per-operator runtimeStats for lazy load are left in place to reflect which + // operator triggered the load but these do not bias the op's timing. + CpuWallTiming processLazyIoStats(Operator& op, const CpuWallTiming& timing); inline void validateOperatorOutputResult( const RowVectorPtr& result, diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 4acfceb4d1d2..8c0c44f7ae9b 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -22,6 +22,7 @@ #include "velox/exec/HashJoinBridge.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/Task.h" +#include "velox/exec/trace/QueryTraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -104,6 +105,50 @@ void Operator::maybeSetReclaimer() { Operator::MemoryReclaimer::create(operatorCtx_->driverCtx(), this)); } +void Operator::maybeSetTracer() { + const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); + if (!queryTraceConfig.has_value()) { + return; + } + + if (operatorCtx_->driverCtx()->queryConfig().queryTraceMaxBytes() == 0) { + return; + } + + if (queryTraceConfig->queryNodes.count(planNodeId()) == 0) { + return; + } + + const auto pipelineId = operatorCtx_->driverCtx()->pipelineId; + const auto driverId = operatorCtx_->driverCtx()->driverId; + LOG(INFO) << "Trace data for operator type: " << operatorType() + << ", operator id: " << operatorId() << ", pipeline: " << pipelineId + << ", driver: " << driverId << ", task: " << taskId(); + const auto opTraceDirPath = fmt::format( + "{}/{}/{}/{}/data", + queryTraceConfig->queryTraceDir, + planNodeId(), + pipelineId, + driverId); + trace::createTraceDirectory(opTraceDirPath); + inputTracer_ = std::make_unique( + opTraceDirPath, + memory::traceMemoryPool(), + queryTraceConfig->updateAndCheckTraceLimitCB); +} + +void Operator::traceInput(const RowVectorPtr& input) { + if (FOLLY_UNLIKELY(inputTracer_ != nullptr)) { + inputTracer_->write(input); + } +} + +void Operator::finishTrace() { + if (inputTracer_ != nullptr) { + inputTracer_->finish(); + } +} + std::vector>& Operator::translators() { static std::vector> translators; @@ -153,6 +198,7 @@ void Operator::initialize() { pool()->name()); initialized_ = true; maybeSetReclaimer(); + maybeSetTracer(); } // static diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index a369b4ffcf2f..711a59da0c71 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -14,6 +14,9 @@ * limitations under the License. */ #pragma once + +#include "velox/exec/trace/QueryDataWriter.h" + #include #include "velox/common/base/RuntimeMetrics.h" #include "velox/common/time/CpuWallTimer.h" @@ -174,6 +177,7 @@ struct OperatorStats { // Last recorded values for lazy loading times for loads triggered by 'this'. int64_t lastLazyCpuNanos{0}; int64_t lastLazyWallNanos{0}; + int64_t lastLazyInputBytes{0}; // Total null keys processed by the operator. // Currently populated only by HashJoin/HashBuild. @@ -398,6 +402,7 @@ class Operator : public BaseRuntimeStatWriter { /// e.g. the first operator in the pipeline. virtual void noMoreInput() { noMoreInput_ = true; + finishTrace(); } /// Returns a RowVector with the result columns. Returns nullptr if @@ -422,6 +427,12 @@ class Operator : public BaseRuntimeStatWriter { /// build side is empty. virtual bool isFinished() = 0; + /// Traces input batch of the operator. + virtual void traceInput(const RowVectorPtr&); + + /// Finishes tracing of the operator. + virtual void finishTrace(); + /// Returns single-column dynamically generated filters to be pushed down to /// upstream operators. Used to push down filters on join keys from broadcast /// hash join into probe-side table scan. Can also be used to push down TopN @@ -723,6 +734,10 @@ class Operator : public BaseRuntimeStatWriter { return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; } + /// Invoked to setup query data writer for this operator if the associated + /// query plan node is configured to collect trace. + void maybeSetTracer(); + /// Creates output vector from 'input_' and 'results' according to /// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to /// nullptr, the children of the output vector will be identical to their @@ -760,6 +775,7 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; + std::unique_ptr inputTracer_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 28ef0877458d..4589646a167f 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -41,7 +41,7 @@ class Destination { setTargetSizePct(); } - // Resets the destination before starting a new batch. + /// Resets the destination before starting a new batch. void beginBatch() { rows_.clear(); rowIdx_ = 0; @@ -57,7 +57,8 @@ class Destination { } } - // Serializes row from 'output' till either 'maxBytes' have been serialized or + /// Serializes row from 'output' till either 'maxBytes' have been serialized + /// or BlockingReason advance( uint64_t maxBytes, const std::vector& sizes, @@ -136,19 +137,19 @@ class Destination { }; } // namespace detail -// In a distributed query engine data needs to be shuffled between workers so -// that each worker only has to process a fraction of the total data. Because -// rows are usually not pre-ordered based on the hash of the partition key for -// an operation (for example join columns, or group by columns), repartitioning -// is needed to send the rows to the right workers. PartitionedOutput operator -// is responsible for this process: it takes a stream of data that is not -// partitioned, and divides the stream into a series of output data ready to be -// sent to other workers. This operator is also capable of re-ordering and -// dropping columns from its input. +/// In a distributed query engine data needs to be shuffled between workers so +/// that each worker only has to process a fraction of the total data. Because +/// rows are usually not pre-ordered based on the hash of the partition key for +/// an operation (for example join columns, or group by columns), repartitioning +/// is needed to send the rows to the right workers. PartitionedOutput operator +/// is responsible for this process: it takes a stream of data that is not +/// partitioned, and divides the stream into a series of output data ready to be +/// sent to other workers. This operator is also capable of re-ordering and +/// dropping columns from its input. class PartitionedOutput : public Operator { public: - // Minimum flush size for non-final flush. 60KB + overhead fits a - // network MTU of 64K. + /// Minimum flush size for non-final flush. 60KB + overhead fits a + /// network MTU of 64K. static constexpr uint64_t kMinDestinationSize = 60 * 1024; PartitionedOutput( @@ -159,13 +160,13 @@ class PartitionedOutput : public Operator { void addInput(RowVectorPtr input) override; - // Always returns nullptr. The action is to further process - // unprocessed input. If all input has been processed, 'this' is in - // a non-blocked state, otherwise blocked. + /// Always returns nullptr. The action is to further process + /// unprocessed input. If all input has been processed, 'this' is in + /// a non-blocked state, otherwise blocked. RowVectorPtr getOutput() override; - // always true but the caller will check isBlocked before adding input, hence - // the blocked state does not accumulate input. + /// always true but the caller will check isBlocked before adding input, hence + /// the blocked state does not accumulate input. bool needsInput() const override { return true; } @@ -202,7 +203,7 @@ class PartitionedOutput : public Operator { void estimateRowSizes(); - /// Collect all rows with null keys into nullRows_. + // Collect all rows with null keys into nullRows_. void collectNullRows(); // If compression in serde is enabled, this is the minimum compression that @@ -223,7 +224,10 @@ class PartitionedOutput : public Operator { BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; bool finished_{false}; + // Contains pointers to 'rowSize_' elements. 'sizePointers_[i]' contains a + // pointer to 'rowSize_[i]'. std::vector sizePointers_; + // The estimated row size for each row. Index maps back to 'output_' index std::vector rowSize_; std::vector> destinations_; bool replicatedAny_{false}; diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 8566f5fb1212..ed99701ce9d6 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -336,7 +336,7 @@ char* RowContainer::initializeRow(char* row, bool reuse) { } void RowContainer::eraseRows(folly::Range rows) { - freeRowsExtraMemory(rows, false); + freeRowsExtraMemory(rows, /*freeNextRowVector=*/true); for (auto* row : rows) { VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_), "Double free of row"); bits::setBit(row, freeFlagOffset_); @@ -457,31 +457,11 @@ void RowContainer::freeAggregates(folly::Range rows) { } } -void RowContainer::freeNextRowVectors(folly::Range rows, bool clear) { +void RowContainer::freeNextRowVectors(folly::Range rows) { if (!nextOffset_ || !hasDuplicateRows_) { return; } - if (clear) { - for (auto row : rows) { - auto vector = getNextRowVector(row); - if (vector) { - // Clear all rows, we can clear the nextOffset_ slots and delete the - // next-row-vector. - for (auto& next : *vector) { - getNextRowVector(next) = nullptr; - } - // Because of 'parallelJoinBuild', the memory for the next row vector - // may not be allocated from the RowContainer to which the row belongs, - // hence we need to release memory through the vector's allocator. - auto allocator = vector->get_allocator().allocator(); - std::destroy_at(vector); - allocator->free(HashStringAllocator::headerOf(vector)); - } - } - return; - } - for (auto row : rows) { auto vector = getNextRowVector(row); if (vector) { @@ -500,10 +480,14 @@ void RowContainer::freeNextRowVectors(folly::Range rows, bool clear) { } } -void RowContainer::freeRowsExtraMemory(folly::Range rows, bool clear) { +void RowContainer::freeRowsExtraMemory( + folly::Range rows, + bool freeNextRowVector) { freeVariableWidthFields(rows); freeAggregates(rows); - freeNextRowVectors(rows, clear); + if (freeNextRowVector) { + freeNextRowVectors(rows); + } numRows_ -= rows.size(); } @@ -959,7 +943,9 @@ void RowContainer::clear() { std::vector rows(kBatch); RowContainerIterator iter; while (auto numRows = listRows(&iter, kBatch, rows.data())) { - freeRowsExtraMemory(folly::Range(rows.data(), numRows), true); + freeRowsExtraMemory( + folly::Range(rows.data(), numRows), + /*freeNextRowVector=*/false); } } hasDuplicateRows_ = false; diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 39ae2aa568c4..7dc2303d582e 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -1368,9 +1368,9 @@ class RowContainer { void freeAggregates(folly::Range rows); // Free next row vectors associated with the 'rows'. - void freeNextRowVectors(folly::Range rows, bool clear); + void freeNextRowVectors(folly::Range rows); - void freeRowsExtraMemory(folly::Range rows, bool clear); + void freeRowsExtraMemory(folly::Range rows, bool freeNextRowVector); // Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true. // columnHasNulls_ flag is false by default. diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 9bd199cfe9ff..2ce07af95cd1 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -119,14 +119,14 @@ void TableWriter::addInput(RowVectorPtr input) { if (input->size() == 0) { return; } - + traceInput(input); std::vector mappedChildren; mappedChildren.reserve(inputMapping_.size()); - for (auto i : inputMapping_) { + for (const auto i : inputMapping_) { mappedChildren.emplace_back(input->childAt(i)); } - auto mappedInput = std::make_shared( + const auto mappedInput = std::make_shared( input->pool(), mappedType_, input->nulls(), diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 66083ceb5d3e..4fba62147af5 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -209,9 +209,6 @@ std::string taskStateString(TaskState state) { } } -std::atomic Task::numCreatedTasks_ = 0; -std::atomic Task::numDeletedTasks_ = 0; - bool registerTaskListener(std::shared_ptr listener) { return listeners().withWLock([&](auto& listeners) { for (const auto& existingListener : listeners) { @@ -277,6 +274,7 @@ std::shared_ptr Task::create( std::move(consumerSupplier), std::move(onError))); task->initTaskPool(); + task->addToTaskList(); return task; } @@ -310,6 +308,9 @@ Task::Task( } Task::~Task() { + SCOPE_EXIT { + removeFromTaskList(); + }; // TODO(spershin): Temporary code designed to reveal what causes SIGABRT in // jemalloc when destroying some Tasks. std::string clearStage; @@ -355,6 +356,48 @@ Task::~Task() { } } +Task::TaskList& Task::taskList() { + static TaskList taskList; + return taskList; +} + +folly::SharedMutex& Task::taskListLock() { + static folly::SharedMutex lock; + return lock; +} + +size_t Task::numRunningTasks() { + std::shared_lock guard{taskListLock()}; + return taskList().size(); +} + +std::vector> Task::getRunningTasks() { + std::vector> tasks; + std::shared_lock guard(taskListLock()); + tasks.reserve(taskList().size()); + for (auto taskEntry : taskList()) { + if (auto task = taskEntry.taskPtr.lock()) { + tasks.push_back(std::move(task)); + } + } + return tasks; +} + +void Task::addToTaskList() { + VELOX_CHECK(!taskListEntry_.listHook.is_linked()); + taskListEntry_.taskPtr = shared_from_this(); + + std::unique_lock guard{taskListLock()}; + taskList().push_back(taskListEntry_); +} + +void Task::removeFromTaskList() { + std::unique_lock guard{taskListLock()}; + if (taskListEntry_.listHook.is_linked()) { + taskListEntry_.listHook.unlink(); + } +} + uint64_t Task::timeSinceStartMsLocked() const { if (taskStats_.executionStartTimeMs == 0UL) { return 0UL; @@ -2856,16 +2899,20 @@ std::optional Task::maybeMakeTraceConfig() const { return std::nullopt; } + const auto traceDir = + fmt::format("{}/{}", queryConfig.queryTraceDir(), taskId_); const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); if (queryTraceNodes.empty()) { - return trace::QueryTraceConfig(queryConfig.queryTraceDir()); + LOG(INFO) << "Trace metadata for task: " << taskId_; + return trace::QueryTraceConfig(traceDir); } std::vector nodes; folly::split(',', queryTraceNodes, nodes); std::unordered_set nodeSet(nodes.begin(), nodes.end()); VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); - LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes; + LOG(INFO) << "Trace data for task " << taskId_ << " with plan nodes " + << queryTraceNodes; trace::UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB = [this](uint64_t bytes) { @@ -2873,7 +2920,7 @@ std::optional Task::maybeMakeTraceConfig() const { }; return trace::QueryTraceConfig( std::move(nodeSet), - queryConfig.queryTraceDir(), + traceDir, std::move(updateAndCheckTraceLimitCB), queryConfig.queryTraceTaskRegExp()); } @@ -2883,11 +2930,9 @@ void Task::maybeInitQueryTrace() { return; } - const auto traceTaskDir = - fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_); - trace::createTraceDirectory(traceTaskDir); + trace::createTraceDirectory(traceConfig_->queryTraceDir); const auto queryMetadatWriter = std::make_unique( - traceTaskDir, memory::traceMemoryPool()); + traceConfig_->queryTraceDir, memory::traceMemoryPool()); queryMetadatWriter->write(queryCtx_, planFragment_.planNode); } diff --git a/velox/exec/Task.h b/velox/exec/Task.h index b237f4220231..9d66c1e1548c 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -138,6 +138,11 @@ class Task : public std::enable_shared_from_this { return pool_.get(); } + /// Returns query trace config if specified. + const std::optional& queryTraceConfig() const { + return traceConfig_; + } + /// Returns ConsumerSupplier passed in the constructor. ConsumerSupplier consumerSupplier() const { return consumerSupplier_; @@ -635,16 +640,6 @@ class Task : public std::enable_shared_from_this { return driverFactories_[driver->driverCtx()->pipelineId]->numDrivers; } - /// Returns the number of created and deleted tasks since the velox engine - /// starts running so far. - static uint64_t numCreatedTasks() { - return numCreatedTasks_; - } - - static uint64_t numDeletedTasks() { - return numDeletedTasks_; - } - const std::string& spillDirectory() const { return spillDirectory_; } @@ -672,6 +667,12 @@ class Task : public std::enable_shared_from_this { ++numThreads_; } + /// Returns the number of running tasks from velox runtime. + static size_t numRunningTasks(); + + /// Returns the list of running tasks from velox runtime. + static std::vector> getRunningTasks(); + /// Invoked to run provided 'callback' on each alive driver of the task. void testingVisitDrivers(const std::function& callback); @@ -681,6 +682,20 @@ class Task : public std::enable_shared_from_this { } private: + // Hook of system-wide running task list. + struct TaskListEntry { + std::weak_ptr taskPtr; + folly::IntrusiveListHook listHook; + }; + using TaskList = + folly::IntrusiveList; + + // Returns the system-wide running task list. + FOLLY_EXPORT static TaskList& taskList(); + + // Returns the lock that protects the system-wide running task list. + FOLLY_EXPORT static folly::SharedMutex& taskListLock(); + Task( const std::string& taskId, core::PlanFragment planFragment, @@ -690,6 +705,13 @@ class Task : public std::enable_shared_from_this { ConsumerSupplier consumerSupplier, std::function onError = nullptr); + // Invoked to add this to the system-wide running task list on task creation. + void addToTaskList(); + + // Invoked to remove this from the system-wide running task list on task + // destruction. + void removeFromTaskList(); + // Consistency check of the task execution to make sure the execution mode // stays the same. void checkExecutionMode(ExecutionMode mode); @@ -811,22 +833,6 @@ class Task : public std::enable_shared_from_this { std::weak_ptr task_; }; - // Counts the number of created tasks which is incremented on each task - // creation. - static std::atomic numCreatedTasks_; - - // Counts the number of deleted tasks which is incremented on each task - // destruction. - static std::atomic numDeletedTasks_; - - static void taskCreated() { - ++numCreatedTasks_; - } - - static void taskDeleted() { - ++numDeletedTasks_; - } - /// Returns true if state is 'running'. bool isRunningLocked() const; @@ -979,26 +985,6 @@ class Task : public std::enable_shared_from_this { // trace enabled. void maybeInitQueryTrace(); - // The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_' - // on task construction and destruction. - class TaskCounter { - public: - TaskCounter() { - Task::taskCreated(); - } - ~TaskCounter() { - Task::taskDeleted(); - } - }; - friend class Task::TaskCounter; - - // NOTE: keep 'taskCount_' the first member so that it will be the first - // constructed member and the last destructed one. The purpose is to make - // 'numCreatedTasks_' and 'numDeletedTasks_' counting more robust to the - // timing race condition when used in scenarios such as waiting for all the - // tasks to be destructed in test. - const TaskCounter taskCounter_; - // Universally unique identifier of the task. Used to identify the task when // calling TaskListener. const std::string uuid_; @@ -1015,6 +1001,9 @@ class Task : public std::enable_shared_from_this { // executed in a single mode throughout its lifetime const ExecutionMode mode_; + // Hook in the system wide task list. + TaskListEntry taskListEntry_; + // Root MemoryPool for this Task. All member variables that hold references // to pool_ must be defined after pool_, childPools_. std::shared_ptr pool_; diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 162817e80d4d..6995493b4c01 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -363,8 +363,11 @@ void WindowPartition::updateKRangeFrameBounds( vector_size_t start = 0; vector_size_t end; + // frameColumn is a column index into the original input rows, while + // orderByColumn is a column index into rows in data_ after the columns are + // reordered as per inputMapping_. RowColumn frameRowColumn = columns_[frameColumn]; - RowColumn orderByRowColumn = columns_[inputMapping_[orderByColumn]]; + RowColumn orderByRowColumn = data_->columnAt(orderByColumn); for (auto i = 0; i < numRows; i++) { auto currentRow = startRow + i; auto* partitionRow = partition_[currentRow]; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 7c497c55cb50..9ce08a5e5e5e 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -7506,7 +7506,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, taskWaitTimeout) { for (uint64_t timeoutMs : {0, 1'000, 30'000}) { SCOPED_TRACE(fmt::format("timeout {}", succinctMillis(timeoutMs))); - auto memoryManager = createMemoryManager(512 << 20, 0, 0, timeoutMs); + auto memoryManager = createMemoryManager(512 << 20, 0, timeoutMs); auto queryCtx = newQueryCtx(memoryManager.get(), executor_.get(), queryMemoryCapacity); diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 9b52725c3fee..3539c301341b 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -152,6 +152,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"}, {" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, @@ -269,6 +270,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"}, {" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" distinctKey0\\s+sum: .+, count: 1, min: .+, max: .+"}, {" hashtable.capacity\\s+sum: 1252, count: 1, min: 1252, max: 1252"}, @@ -345,6 +347,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"}, {" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" numWrittenFiles\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index fd982240c2c6..85643fd4134d 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -1539,7 +1539,7 @@ TEST_F(TableScanTest, preloadingSplitClose) { latch.count_down(); }); } - ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); + ASSERT_EQ(Task::numRunningTasks(), 0); auto task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2); auto stats = getTableScanRuntimeStats(task); @@ -1547,9 +1547,8 @@ TEST_F(TableScanTest, preloadingSplitClose) { ASSERT_GT(stats.at("preloadedSplits").sum, 1); task.reset(); - // Once all task references are cleared, the count of deleted tasks should - // promptly match the count of created tasks. - ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); + // Once all task references are cleared, all the tasks should be destroyed. + ASSERT_EQ(Task::numRunningTasks(), 0); // Clean blocking items in the IO thread pool. for (auto& baton : batons) { baton.post(); @@ -3816,7 +3815,12 @@ TEST_F(TableScanTest, structLazy) { .project({"cardinality(c2.c0)"}) .planNode(); - assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + auto task = assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + + // Ensure lazy stats are attributed to table scan. + const auto stats = task->taskStats(); + EXPECT_GT(stats.pipelineStats[0].operatorStats[0].inputBytes, 0); + EXPECT_GT(stats.pipelineStats[0].operatorStats[0].outputBytes, 0); } TEST_F(TableScanTest, interleaveLazyEager) { diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 307d168a3f1a..84f4d23e1b41 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -466,8 +466,9 @@ class TaskTest : public HiveConnectorTestBase { core::PlanFragment plan, const std::unordered_map>& filePaths = {}) { + static std::atomic_uint64_t taskId{0}; auto task = Task::create( - "single.execution.task.0", + fmt::format("single.execution.task.{}", taskId++), plan, 0, core::QueryCtx::create(), @@ -743,15 +744,11 @@ TEST_F(TaskTest, serialExecution) { makeFlatVector(100, [](auto row) { return row + 5; }), }); - uint64_t numCreatedTasks = Task::numCreatedTasks(); - uint64_t numDeletedTasks = Task::numDeletedTasks(); { auto [task, results] = executeSerial(plan); assertEqualResults( std::vector{expectedResult, expectedResult}, results); } - ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks()); - ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks()); // Project + Aggregation. plan = PlanBuilder() @@ -776,14 +773,10 @@ TEST_F(TaskTest, serialExecution) { 995 / 2.0 + 4}), }); - ++numCreatedTasks; - ++numDeletedTasks; { auto [task, results] = executeSerial(plan); assertEqualResults({expectedResult}, results); } - ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks()); - ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks()); // Project + Aggregation over TableScan. auto filePath = TempFilePath::create(); @@ -808,6 +801,65 @@ TEST_F(TaskTest, serialExecution) { VELOX_ASSERT_THROW(executeSerial(plan), "division by zero"); } +// The purpose of the test is to check the running task list APIs. +TEST_F(TaskTest, runningTaskList) { + const auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + ASSERT_EQ(Task::numRunningTasks(), 0); + ASSERT_TRUE(Task::getRunningTasks().empty()); + + const auto plan = PlanBuilder() + .values({data, data}) + .filter("c0 < 100") + .project({"c0 + 5"}) + .planFragment(); + + // This is to verify that runningTaskList API returns a completed task which + // still has pending references. + std::vector> expectedRunningTasks; + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 1); + ASSERT_EQ(Task::getRunningTasks().size(), 1); + + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 2); + ASSERT_EQ(Task::getRunningTasks().size(), 2); + + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 3); + ASSERT_EQ(Task::getRunningTasks().size(), 3); + + std::set expectedTaskIdSet; + for (const auto& task : expectedRunningTasks) { + expectedTaskIdSet.insert(task->taskId()); + } + ASSERT_EQ(expectedTaskIdSet.size(), 3); + std::vector> runningTasks = Task::getRunningTasks(); + ASSERT_EQ(runningTasks.size(), 3); + for (const auto& task : runningTasks) { + ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1); + } + + expectedTaskIdSet.erase(expectedRunningTasks.back()->taskId()); + expectedRunningTasks.pop_back(); + ASSERT_EQ(expectedTaskIdSet.size(), 2); + + runningTasks.clear(); + runningTasks = Task::getRunningTasks(); + ASSERT_EQ(runningTasks.size(), 2); + for (const auto& task : runningTasks) { + ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1); + } + + runningTasks.clear(); + expectedRunningTasks.clear(); + + ASSERT_EQ(Task::numRunningTasks(), 0); + ASSERT_TRUE(Task::getRunningTasks().empty()); +} + TEST_F(TaskTest, serialHashJoin) { auto left = makeRowVector( {"t_c0", "t_c1"}, diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index 21df032f58a4..16b7294913a0 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -487,5 +487,41 @@ TEST_F(WindowTest, nagativeFrameArg) { } } +DEBUG_ONLY_TEST_F(WindowTest, frameColumnNullCheck) { + auto makePlan = [&](const RowVectorPtr& input) { + return PlanBuilder() + .values({input}) + .window( + {"sum(c0) OVER (PARTITION BY p0 ORDER BY s0 RANGE BETWEEN UNBOUNDED PRECEDING AND off0 FOLLOWING)"}) + .planNode(); + }; + + // Null values in order-by column 's0' and frame column 'off0' do not match, + // so exception is expected. + auto inputThrow = makeRowVector( + {"c0", "p0", "s0", "off0"}, + { + makeNullableFlatVector({1, std::nullopt, 1, 2, 2}), + makeFlatVector({1, 2, 1, 2, 1}), + makeNullableFlatVector({1, 2, 3, std::nullopt, 5}), + makeNullableFlatVector({2, std::nullopt, 4, 5, 6}), + }); + VELOX_ASSERT_THROW( + AssertQueryBuilder(makePlan(inputThrow)).copyResults(pool()), ""); + + // Null values in order-by column 's0' and frame column 'off0' match, so no + // exception should be thrown. + auto inputNoThrow = makeRowVector( + {"c0", "p0", "s0", "off0"}, + { + makeNullableFlatVector({1, 1, 2, std::nullopt, 2}), + makeFlatVector({1, 1, 1, 2, 2}), + makeNullableFlatVector({1, std::nullopt, 2, 3, 5}), + makeNullableFlatVector({2, std::nullopt, 3, 4, 6}), + }); + ASSERT_NO_THROW( + AssertQueryBuilder(makePlan(inputNoThrow)).copyResults(pool())); +} + } // namespace } // namespace facebook::velox::exec diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index da19a289eb2c..160d4e5d228a 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -45,7 +45,6 @@ std::shared_ptr newQueryCtx( std::unique_ptr createMemoryManager( int64_t arbitratorCapacity, uint64_t memoryPoolInitCapacity, - uint64_t memoryPoolTransferCapacity, uint64_t maxReclaimWaitMs, uint64_t fastExponentialGrowthCapacityLimit, double slowCapacityGrowPct) { @@ -59,8 +58,6 @@ std::unique_ptr createMemoryManager( options.extraArbitratorConfigs = { {std::string(ExtraConfig::kMemoryPoolInitialCapacity), folly::to(memoryPoolInitCapacity) + "B"}, - {std::string(ExtraConfig::kMemoryPoolTransferCapacity), - folly::to(memoryPoolTransferCapacity) + "B"}, {std::string(ExtraConfig::kMemoryReclaimMaxWaitTime), folly::to(maxReclaimWaitMs) + "ms"}, {std::string(ExtraConfig::kGlobalArbitrationEnabled), "true"}, diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index 9aa50153b17d..3c1cb6191b98 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -33,7 +33,6 @@ constexpr int64_t MB = 1024L * KB; constexpr uint64_t kMemoryCapacity = 512 * MB; constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB; -constexpr uint64_t kMemoryPoolTransferCapacity = 8 * MB; class FakeMemoryReclaimer : public exec::MemoryReclaimer { public: @@ -95,7 +94,6 @@ std::shared_ptr newQueryCtx( std::unique_ptr createMemoryManager( int64_t arbitratorCapacity = kMemoryCapacity, uint64_t memoryPoolInitCapacity = kMemoryPoolInitCapacity, - uint64_t memoryPoolTransferCapacity = kMemoryPoolTransferCapacity, uint64_t maxReclaimWaitMs = 0, uint64_t fastExponentialGrowthCapacityLimit = 0, double slowCapacityGrowPct = 0); diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index 50e7a668a94e..f7944e6a9dea 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -1474,48 +1474,28 @@ bool waitForTaskStateChange( } void waitForAllTasksToBeDeleted(uint64_t maxWaitUs) { - const uint64_t numCreatedTasks = Task::numCreatedTasks(); - uint64_t numDeletedTasks = Task::numDeletedTasks(); uint64_t waitUs = 0; - while (numCreatedTasks > numDeletedTasks) { + while (Task::numRunningTasks() != 0) { constexpr uint64_t kWaitInternalUs = 1'000; std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); waitUs += kWaitInternalUs; - numDeletedTasks = Task::numDeletedTasks(); if (waitUs >= maxWaitUs) { break; } } - VELOX_CHECK_EQ( - numDeletedTasks, - numCreatedTasks, - "{} tasks have been created while only {} have been deleted after waiting for {} us", - numCreatedTasks, - numDeletedTasks, - waitUs); -} - -void waitForAllTasksToBeDeleted( - uint64_t expectedDeletedTasks, - uint64_t maxWaitUs) { - uint64_t numDeletedTasks = Task::numDeletedTasks(); - uint64_t waitUs = 0; - while (expectedDeletedTasks > numDeletedTasks) { - constexpr uint64_t kWaitInternalUs = 1'000; - std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); - waitUs += kWaitInternalUs; - numDeletedTasks = Task::numDeletedTasks(); - if (waitUs >= maxWaitUs) { - break; - } + std::vector> pendingTasks = Task::getRunningTasks(); + if (pendingTasks.empty()) { + return; } - VELOX_CHECK_EQ( - numDeletedTasks, - expectedDeletedTasks, - "expected {} tasks to be deleted but only {} have been deleted after waiting for {} us", - expectedDeletedTasks, - numDeletedTasks, - waitUs); + std::vector pendingTaskStats; + pendingTaskStats.reserve(pendingTasks.size()); + for (const auto& task : pendingTasks) { + pendingTaskStats.push_back(task->toString()); + } + VELOX_FAIL( + "{} pending tasks\n{}", + pendingTasks.size(), + folly::join("\n", pendingTaskStats)); } std::shared_ptr assertQuery( diff --git a/velox/exec/tests/utils/QueryAssertions.h b/velox/exec/tests/utils/QueryAssertions.h index 217d12351e32..d1bbe253a2ff 100644 --- a/velox/exec/tests/utils/QueryAssertions.h +++ b/velox/exec/tests/utils/QueryAssertions.h @@ -213,12 +213,6 @@ bool waitForTaskStateChange( /// during this wait call. This is for testing purpose for now. void waitForAllTasksToBeDeleted(uint64_t maxWaitUs = 3'000'000); -/// Similar to above test utility except waiting for a specific number of -/// tasks to be deleted. -void waitForAllTasksToBeDeleted( - uint64_t expectedDeletedTasks, - uint64_t maxWaitUs); - std::shared_ptr assertQuery( const core::PlanNodePtr& plan, const std::string& duckDbSql, diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 8e8a2a6405b8..5c3122aefc06 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -415,6 +415,108 @@ TEST_F(QueryTracerTest, traceDir) { ASSERT_EQ(expectedDirs.count(dir), 1); } } + +TEST_F(QueryTracerTest, traceTableWriter) { + const auto rowType = ROW({"a", "b", "c"}, {BIGINT(), BIGINT(), BIGINT()}); + std::vector inputVectors; + constexpr auto numBatch = 5; + inputVectors.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(rowType)); + } + + struct { + std::string taskRegExpr; + uint64_t maxTracedBytes; + uint8_t numTracedBatches; + bool limitExceeded; + + std::string debugString() const { + return fmt::format( + "taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}", + taskRegExpr, + maxTracedBytes, + numTracedBatches, + limitExceeded); + } + } testSettings[]{ + {".*", 10UL << 30, numBatch, false}, + {".*", 0, numBatch, false}, + {"wrong id", 10UL << 30, 0, false}, + {"test_cursor \\d+", 10UL << 30, numBatch, false}, + {"test_cursor \\d+", 800, 2, true}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + const auto outputDir = TempDirectoryPath::create(); + const auto planNode = PlanBuilder() + .values(inputVectors) + .tableWrite(outputDir->getPath()) + .planNode(); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = + fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + AssertQueryBuilder(planNode) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr) + .config(core::QueryConfig::kQueryTraceNodeIds, "1") + .copyResults(pool(), task); + + const auto metadataDir = fmt::format("{}/{}", traceRoot, task->taskId()); + const auto fs = filesystems::getFileSystem(metadataDir, nullptr); + + if (testData.taskRegExpr == "wrong id") { + ASSERT_FALSE(fs->exists(traceRoot)); + continue; + } + + // Query metadta file should exist. + const auto traceMetaFile = fmt::format( + "{}/{}/{}", + traceRoot, + task->taskId(), + trace::QueryTraceTraits::kQueryMetaFileName); + ASSERT_TRUE(fs->exists(traceMetaFile)); + + const auto dataDir = + fmt::format("{}/{}/{}", traceRoot, task->taskId(), "1/0/0/data"); + + // Query data tracing disabled. + if (testData.maxTracedBytes == 0) { + ASSERT_FALSE(fs->exists(dataDir)); + continue; + } + + ASSERT_EQ(fs->list(dataDir).size(), 2); + // Check data summaries. + const auto summaryFile = fs->openFileForRead( + fmt::format("{}/{}", dataDir, QueryTraceTraits::kDataSummaryFileName)); + const auto summary = summaryFile->pread(0, summaryFile->size()); + ASSERT_FALSE(summary.empty()); + folly::dynamic obj = folly::parseJson(summary); + ASSERT_EQ( + obj[QueryTraceTraits::kTraceLimitExceededKey].asBool(), + testData.limitExceeded); + + const auto reader = trace::QueryDataReader(dataDir, pool()); + RowVectorPtr actual; + size_t numOutputVectors{0}; + while (reader.read(actual)) { + const auto expected = inputVectors[numOutputVectors]; + const auto size = actual->size(); + ASSERT_EQ(size, expected->size()); + for (auto i = 0; i < size; ++i) { + actual->compare(expected.get(), i, i, {.nullsFirst = true}); + } + ++numOutputVectors; + } + ASSERT_EQ(numOutputVectors, testData.numTracedBatches); + } +} } // namespace facebook::velox::exec::trace::test // This main is needed for some tests on linux. diff --git a/velox/expression/tests/ExpressionVerifier.cpp b/velox/expression/tests/ExpressionVerifier.cpp index d22ec7cbde9b..2dbeea206135 100644 --- a/velox/expression/tests/ExpressionVerifier.cpp +++ b/velox/expression/tests/ExpressionVerifier.cpp @@ -177,8 +177,9 @@ fuzzer::ResultOrError ExpressionVerifier::verify( "Input after simplified"); } catch (const VeloxException& e) { - if (!e.isUserError() && - e.errorCode() != error_code::kUnsupportedInputUncatchable) { + if (e.errorCode() == error_code::kUnsupportedInputUncatchable) { + unsupportedInputUncatchableError = true; + } else if (!e.isUserError()) { LOG(ERROR) << "Simplified eval: VeloxRuntimeErrors other than UNSUPPORTED_INPUT_UNCATCHABLE error are not allowed."; persistReproInfoIfNeeded( @@ -197,10 +198,19 @@ fuzzer::ResultOrError ExpressionVerifier::verify( try { // Compare results or exceptions (if any). Fail if anything is different. if (exceptionCommonPtr || exceptionSimplifiedPtr) { - // Throws in case exceptions are not compatible. If they are compatible, - // return false to signal that the expression failed. - fuzzer::compareExceptions(exceptionCommonPtr, exceptionSimplifiedPtr); - return {nullptr, exceptionCommonPtr, unsupportedInputUncatchableError}; + // UNSUPPORTED_INPUT_UNCATCHABLE errors are VeloxRuntimeErrors that cannot + // be suppressed by default NULLs. So it may happen that only one of the + // common and simplified path throws this error. In this case, we do not + // compare the exceptions. + if (!unsupportedInputUncatchableError) { + // Throws in case exceptions are not compatible. If they are compatible, + // return false to signal that the expression failed. + fuzzer::compareExceptions(exceptionCommonPtr, exceptionSimplifiedPtr); + } + return { + nullptr, + exceptionCommonPtr ? exceptionCommonPtr : exceptionSimplifiedPtr, + unsupportedInputUncatchableError}; } else { // Throws in case output is different. VELOX_CHECK_EQ(commonEvalResult.size(), plans.size()); diff --git a/velox/functions/prestosql/InPredicate.cpp b/velox/functions/prestosql/InPredicate.cpp index 86d7fdeacb20..0819cd0d69f1 100644 --- a/velox/functions/prestosql/InPredicate.cpp +++ b/velox/functions/prestosql/InPredicate.cpp @@ -20,36 +20,39 @@ namespace facebook::velox::functions { namespace { +// This implements InPredicate using a set over VectorValues (pairs of +// BaseVector, index). Can be used in place of Filters for Types not supported +// by Filters or when custom comparisons are needed. // Returns NULL if // - input value is NULL // - in-list is NULL or empty // - input value doesn't have an exact match, but has an indeterminate match in // the in-list. E.g., 'array[null] in (array[1])' or 'array[1] in // (array[null])'. -class ComplexTypeInPredicate : public exec::VectorFunction { +class VectorSetInPredicate : public exec::VectorFunction { public: - struct ComplexValue { + struct VectorValue { BaseVector* vector; vector_size_t index; }; - struct ComplexValueHash { - size_t operator()(ComplexValue value) const { + struct VectorValueHash { + size_t operator()(VectorValue value) const { return value.vector->hashValueAt(value.index); } }; - struct ComplexValueEqualTo { - bool operator()(ComplexValue left, ComplexValue right) const { + struct VectorValueEqualTo { + bool operator()(VectorValue left, VectorValue right) const { return left.vector->equalValueAt(right.vector, left.index, right.index); } }; - using ComplexSet = - folly::F14FastSet; + using VectorSet = + folly::F14FastSet; - ComplexTypeInPredicate( - ComplexSet uniqueValues, + VectorSetInPredicate( + VectorSet uniqueValues, bool hasNull, VectorPtr originalValues) : uniqueValues_{std::move(uniqueValues)}, @@ -58,7 +61,7 @@ class ComplexTypeInPredicate : public exec::VectorFunction { static std::shared_ptr create(const VectorPtr& values, vector_size_t offset, vector_size_t size) { - ComplexSet uniqueValues; + VectorSet uniqueValues; bool hasNull = false; for (auto i = offset; i < offset + size; i++) { @@ -68,7 +71,7 @@ class ComplexTypeInPredicate : public exec::VectorFunction { uniqueValues.insert({values.get(), i}); } - return std::make_shared( + return std::make_shared( std::move(uniqueValues), hasNull, values); } @@ -126,7 +129,7 @@ class ComplexTypeInPredicate : public exec::VectorFunction { // Set of unique values to check against. This set doesn't include any value // that is null or contains null. - const ComplexSet uniqueValues_; + const VectorSet uniqueValues_; // Boolean indicating whether one of the value was null or contained null. const bool hasNull_; @@ -339,10 +342,15 @@ class InPredicate : public exec::VectorFunction { } const auto& elements = arrayVector->elements(); + const auto& elementType = elements->type(); + + if (elementType->providesCustomComparison()) { + return VectorSetInPredicate::create(elements, offset, size); + } std::pair, bool> filter; - switch (inListType->childAt(0)->kind()) { + switch (elementType->kind()) { case TypeKind::HUGEINT: filter = createHugeintValuesFilter(elements, offset, size); break; @@ -384,7 +392,7 @@ class InPredicate : public exec::VectorFunction { case TypeKind::MAP: [[fallthrough]]; case TypeKind::ROW: - return ComplexTypeInPredicate::create(elements, offset, size); + return VectorSetInPredicate::create(elements, offset, size); default: VELOX_UNSUPPORTED( "Unsupported in-list type for IN predicate: {}", diff --git a/velox/functions/prestosql/aggregates/CMakeLists.txt b/velox/functions/prestosql/aggregates/CMakeLists.txt index 4e467bab1f66..986de438474e 100644 --- a/velox/functions/prestosql/aggregates/CMakeLists.txt +++ b/velox/functions/prestosql/aggregates/CMakeLists.txt @@ -36,7 +36,8 @@ velox_add_library( MapUnionSumAggregate.cpp MaxSizeForStatsAggregate.cpp MinMaxAggregates.cpp - MinMaxByAggregates.cpp + MaxByAggregate.cpp + MinByAggregate.cpp MultiMapAggAggregate.cpp PrestoHasher.cpp ReduceAgg.cpp diff --git a/velox/functions/prestosql/aggregates/MaxByAggregate.cpp b/velox/functions/prestosql/aggregates/MaxByAggregate.cpp new file mode 100644 index 000000000000..60cc72f008b4 --- /dev/null +++ b/velox/functions/prestosql/aggregates/MaxByAggregate.cpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/functions/prestosql/aggregates/AggregateNames.h" +#include "velox/functions/prestosql/aggregates/MinMaxByAggregateBase.h" + +namespace facebook::velox::aggregate::prestosql { + +template +class MaxByNAggregate : public MinMaxByNAggregate> { + public: + explicit MaxByNAggregate(TypePtr resultType) + : MinMaxByNAggregate>(resultType) {} +}; + +template +class MaxByNAggregate + : public MinMaxByNAggregate< + ComplexType, + C, + Greater> { + public: + explicit MaxByNAggregate(TypePtr resultType) + : MinMaxByNAggregate< + ComplexType, + C, + Greater>(resultType) {} +}; + +void registerMaxByAggregates( + const std::string& prefix, + bool withCompanionFunctions, + bool overwrite) { + registerMinMaxBy< + functions::aggregate::MinMaxByAggregateBase, + true, + MaxByNAggregate>(prefix + kMaxBy, withCompanionFunctions, overwrite); +} + +} // namespace facebook::velox::aggregate::prestosql diff --git a/velox/functions/prestosql/aggregates/MinByAggregate.cpp b/velox/functions/prestosql/aggregates/MinByAggregate.cpp new file mode 100644 index 000000000000..6d47f73867dd --- /dev/null +++ b/velox/functions/prestosql/aggregates/MinByAggregate.cpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/functions/prestosql/aggregates/AggregateNames.h" +#include "velox/functions/prestosql/aggregates/MinMaxByAggregateBase.h" + +namespace facebook::velox::aggregate::prestosql { + +template +class MinByNAggregate : public MinMaxByNAggregate> { + public: + explicit MinByNAggregate(TypePtr resultType) + : MinMaxByNAggregate>(resultType) {} +}; + +template +class MinByNAggregate + : public MinMaxByNAggregate< + ComplexType, + C, + Less> { + public: + explicit MinByNAggregate(TypePtr resultType) + : MinMaxByNAggregate< + ComplexType, + C, + Less>(resultType) {} +}; + +void registerMinByAggregates( + const std::string& prefix, + bool withCompanionFunctions, + bool overwrite) { + registerMinMaxBy< + functions::aggregate::MinMaxByAggregateBase, + false, + MinByNAggregate>(prefix + kMinBy, withCompanionFunctions, overwrite); +} + +} // namespace facebook::velox::aggregate::prestosql diff --git a/velox/functions/prestosql/aggregates/MinMaxByAggregates.cpp b/velox/functions/prestosql/aggregates/MinMaxByAggregateBase.h similarity index 95% rename from velox/functions/prestosql/aggregates/MinMaxByAggregates.cpp rename to velox/functions/prestosql/aggregates/MinMaxByAggregateBase.h index 71f020ca398c..5f6c57690594 100644 --- a/velox/functions/prestosql/aggregates/MinMaxByAggregates.cpp +++ b/velox/functions/prestosql/aggregates/MinMaxByAggregateBase.h @@ -21,12 +21,8 @@ #include "velox/functions/prestosql/aggregates/AggregateNames.h" #include "velox/type/FloatingPointUtil.h" -using namespace facebook::velox::functions::aggregate; - namespace facebook::velox::aggregate::prestosql { -namespace { - /// Returns true if the value in 'index' row of 'newComparisons' is strictly /// greater than or less than the value in the 'accumulator'. template @@ -36,7 +32,7 @@ struct Comparator { const DecodedVector& newComparisons, vector_size_t index, bool isFirstValue) { - if constexpr (isNumeric()) { + if constexpr (functions::aggregate::isNumeric()) { if (isFirstValue) { return true; } @@ -1017,48 +1013,6 @@ class MinMaxByNAggregate : public exec::Aggregate { DecodedVector decodedIntermediates_; }; -template -class MinByNAggregate : public MinMaxByNAggregate> { - public: - explicit MinByNAggregate(TypePtr resultType) - : MinMaxByNAggregate>(resultType) {} -}; - -template -class MinByNAggregate - : public MinMaxByNAggregate< - ComplexType, - C, - Less> { - public: - explicit MinByNAggregate(TypePtr resultType) - : MinMaxByNAggregate< - ComplexType, - C, - Less>(resultType) {} -}; - -template -class MaxByNAggregate : public MinMaxByNAggregate> { - public: - explicit MaxByNAggregate(TypePtr resultType) - : MinMaxByNAggregate>(resultType) {} -}; - -template -class MaxByNAggregate - : public MinMaxByNAggregate< - ComplexType, - C, - Greater> { - public: - explicit MaxByNAggregate(TypePtr resultType) - : MinMaxByNAggregate< - ComplexType, - C, - Greater>(resultType) {} -}; - template