From 70c108737d0b7355b27c7b461a386c5ed2987552 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 29 Jan 2025 00:41:04 -0500 Subject: [PATCH 1/6] Make query async --- cpp/cmake/rapids_config.cmake | 5 ++++- cpp/include/kvikio/defaults.hpp | 6 +++-- cpp/include/kvikio/parallel_operation.hpp | 27 +++++++++++++++-------- cpp/include/kvikio/utils.hpp | 2 ++ cpp/src/defaults.cpp | 2 +- cpp/src/file_handle.cpp | 22 +++++++++--------- 6 files changed, 41 insertions(+), 23 deletions(-) diff --git a/cpp/cmake/rapids_config.cmake b/cpp/cmake/rapids_config.cmake index c3a6a0ef13..32b8b6ace1 100644 --- a/cpp/cmake/rapids_config.cmake +++ b/cpp/cmake/rapids_config.cmake @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2018-2024, NVIDIA CORPORATION. +# Copyright (c) 2018-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -11,6 +11,9 @@ # or implied. See the License for the specific language governing permissions and limitations under # the License. # ============================================================================= +set(rapids-cmake-repo "kingcrimsontianyu/rapids-cmake") +set(rapids-cmake-branch "bump-bs-threadpool-version-5.0.0") + file(READ "${CMAKE_CURRENT_LIST_DIR}/../../VERSION" _rapids_version) if(_rapids_version MATCHES [[^([0-9][0-9])\.([0-9][0-9])\.([0-9][0-9])]]) set(RAPIDS_VERSION_MAJOR "${CMAKE_MATCH_1}") diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 501c71981a..d4e533bf68 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -81,13 +81,15 @@ bool getenv_or(std::string_view env_var_name, bool default_val); template <> CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val); +using BS_thread_pool = BS::thread_pool; + /** * @brief Singleton class of default values used throughout KvikIO. * */ class defaults { private: - BS::thread_pool _thread_pool{get_num_threads_from_env()}; + BS_thread_pool _thread_pool{get_num_threads_from_env()}; CompatMode _compat_mode; std::size_t _task_size; std::size_t _gds_threshold; @@ -183,7 +185,7 @@ class defaults { * * @return The the default thread pool instance. */ - [[nodiscard]] static BS::thread_pool& thread_pool(); + [[nodiscard]] static BS_thread_pool& thread_pool(); /** * @brief Get the number of threads in the default thread pool. diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index f345333c4f..dbc22fe8a2 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -38,6 +39,13 @@ std::future submit_task( [=] { return op(buf, size, file_offset, devPtr_offset); }); } +template +auto make_copyable_lambda(F&& f) +{ + auto sp = std::make_shared(std::forward(f)); + return [sp]() -> decltype(auto) { return (*sp)(); }; +} + } // namespace detail /** @@ -83,14 +91,15 @@ std::future parallel_io(F op, if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); } // Finally, we sum the result of all tasks. - auto gather_tasks = [](std::vector>&& tasks) -> std::size_t { - std::size_t ret = 0; - for (auto& task : tasks) { - ret += task.get(); - } - return ret; - }; - return std::async(std::launch::deferred, gather_tasks, std::move(tasks)); + auto gather_tasks = + detail::make_copyable_lambda([tasks = std::move(tasks)]() mutable -> std::size_t { + std::size_t ret = 0; + for (auto& task : tasks) { + ret += task.get(); + } + return ret; + }); + return defaults::thread_pool().submit_task(std::move(gather_tasks)); } } // namespace kvikio diff --git a/cpp/include/kvikio/utils.hpp b/cpp/include/kvikio/utils.hpp index b10e54c482..41b50f7108 100644 --- a/cpp/include/kvikio/utils.hpp +++ b/cpp/include/kvikio/utils.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -152,6 +153,7 @@ std::tuple get_alloc_info(void const* devPtr, template bool is_future_done(T const& future) { + assert(future.valid()); return future.wait_for(std::chrono::seconds(0)) != std::future_status::timeout; } diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index f005e86d0b..0e49644751 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -163,7 +163,7 @@ bool defaults::is_compat_mode_preferred(CompatMode compat_mode) noexcept bool defaults::is_compat_mode_preferred() { return is_compat_mode_preferred(compat_mode()); } -BS::thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } +BS_thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } unsigned int defaults::thread_pool_nthreads() { return thread_pool().get_thread_count(); } diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index 0e65afb7fd..882799e016 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -210,11 +210,12 @@ std::future FileHandle::pread(void* buf, // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { - auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { - PushAndPopContext c(ctx); - return detail::posix_device_read(_fd_direct_off.fd(), buf, size, file_offset, 0); - }; - return std::async(std::launch::deferred, task); + PushAndPopContext c(ctx); + auto bytes_read = detail::posix_device_read(_fd_direct_off.fd(), buf, size, file_offset, 0); + std::promise read_promise; + auto read_future = read_promise.get_future(); + read_promise.set_value(bytes_read); + return read_future; } // Let's synchronize once instead of in each task. @@ -260,11 +261,12 @@ std::future FileHandle::pwrite(void const* buf, // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { - auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { - PushAndPopContext c(ctx); - return detail::posix_device_write(_fd_direct_off.fd(), buf, size, file_offset, 0); - }; - return std::async(std::launch::deferred, task); + PushAndPopContext c(ctx); + auto bytes_write = detail::posix_device_write(_fd_direct_off.fd(), buf, size, file_offset, 0); + std::promise write_promise; + auto write_future = write_promise.get_future(); + write_promise.set_value(bytes_write); + return write_future; } // Let's synchronize once instead of in each task. From 75f7a121916f4955567216c8d7010e65c361b446 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 29 Jan 2025 01:40:45 -0500 Subject: [PATCH 2/6] Revert the bs thread pool change --- cpp/cmake/rapids_config.cmake | 5 +---- cpp/include/kvikio/defaults.hpp | 6 ++---- cpp/src/defaults.cpp | 2 +- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/cpp/cmake/rapids_config.cmake b/cpp/cmake/rapids_config.cmake index 32b8b6ace1..c3a6a0ef13 100644 --- a/cpp/cmake/rapids_config.cmake +++ b/cpp/cmake/rapids_config.cmake @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2018-2025, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -11,9 +11,6 @@ # or implied. See the License for the specific language governing permissions and limitations under # the License. # ============================================================================= -set(rapids-cmake-repo "kingcrimsontianyu/rapids-cmake") -set(rapids-cmake-branch "bump-bs-threadpool-version-5.0.0") - file(READ "${CMAKE_CURRENT_LIST_DIR}/../../VERSION" _rapids_version) if(_rapids_version MATCHES [[^([0-9][0-9])\.([0-9][0-9])\.([0-9][0-9])]]) set(RAPIDS_VERSION_MAJOR "${CMAKE_MATCH_1}") diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index d4e533bf68..501c71981a 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -81,15 +81,13 @@ bool getenv_or(std::string_view env_var_name, bool default_val); template <> CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val); -using BS_thread_pool = BS::thread_pool; - /** * @brief Singleton class of default values used throughout KvikIO. * */ class defaults { private: - BS_thread_pool _thread_pool{get_num_threads_from_env()}; + BS::thread_pool _thread_pool{get_num_threads_from_env()}; CompatMode _compat_mode; std::size_t _task_size; std::size_t _gds_threshold; @@ -185,7 +183,7 @@ class defaults { * * @return The the default thread pool instance. */ - [[nodiscard]] static BS_thread_pool& thread_pool(); + [[nodiscard]] static BS::thread_pool& thread_pool(); /** * @brief Get the number of threads in the default thread pool. diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index 0e49644751..f005e86d0b 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -163,7 +163,7 @@ bool defaults::is_compat_mode_preferred(CompatMode compat_mode) noexcept bool defaults::is_compat_mode_preferred() { return is_compat_mode_preferred(compat_mode()); } -BS_thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } +BS::thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } unsigned int defaults::thread_pool_nthreads() { return thread_pool().get_thread_count(); } From d30e0a02ed8a29acec56533d5afcb24a9acc28b2 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 30 Jan 2025 00:26:49 -0500 Subject: [PATCH 3/6] Improve implementation slightly --- cpp/include/kvikio/parallel_operation.hpp | 84 ++++++++++++++++++----- 1 file changed, 67 insertions(+), 17 deletions(-) diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index dbc22fe8a2..0449da7f6f 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -31,19 +32,62 @@ namespace kvikio { namespace detail { -template -std::future submit_task( - F op, T buf, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset) +/** + * @brief Utility function to create a copyable callable from a move-only callable. + * + * The underlying thread pool uses `std::function` (until C++23) or `std::move_only_function` + * (since C++23) as the element type of the task queue. For the former case that currently applies, + * the `std::function` requires its "target" (associated callable) to be copy-constructible. This + * utility function is a workaround for those move-only callables. + * + * @tparam F Callable type. F shall be move-only. + * @param op Callable. + * @return A new callable that satisfies the copy-constructible condition. + */ +template +auto make_copyable_lambda(F op) { - return defaults::thread_pool().submit_task( - [=] { return op(buf, size, file_offset, devPtr_offset); }); + static_assert(std::is_move_constructible_v); + + // Create the callable on the heap by moving from f. Use a shared pointer to manage its lifetime. + auto sp = std::make_shared(std::forward(op)); + + // Use the copyable closure as the proxy of the move-only callable. + return + [sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward(args)...); }; } +/** + * @brief Submit the task callable to the underlying thread pool. + * + * Both the callable and arguments shall satisfy copy-constructible. + * + * @tparam F Callable type. + * @tparam Args Argument type. + * @param op Callable. + * @param args Arguments to the callable. + * @return A future to be used later to check if the operation has finished its execution. + */ +template +std::future submit_task(F&& op, Args&&... args) +{ + static_assert(std::is_invocable_r_v, Args...>); + return defaults::thread_pool().submit_task([=] { return op(args...); }); +} + +/** + * @brief Submit the move-only task callable to the underlying thread pool. + * + * @tparam F Callable type. F shall be move-only and have no argument. + * @param op Callable. + * @return A future to be used later to check if the operation has finished its execution. + */ template -auto make_copyable_lambda(F&& f) +std::future submit_move_only_task(F op_move_only) { - auto sp = std::make_shared(std::forward(f)); - return [sp]() -> decltype(auto) { return (*sp)(); }; + static_assert(std::is_invocable_r_v); + auto op_copyable = make_copyable_lambda(std::move(op_move_only)); + return defaults::thread_pool().submit_task(op_copyable); } } // namespace detail @@ -68,6 +112,13 @@ std::future parallel_io(F op, std::size_t task_size, std::size_t devPtr_offset) { + static_assert(std::is_invocable_r_v); + if (task_size == 0) { throw std::invalid_argument("`task_size` cannot be zero"); } // Single-task guard @@ -91,15 +142,14 @@ std::future parallel_io(F op, if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); } // Finally, we sum the result of all tasks. - auto gather_tasks = - detail::make_copyable_lambda([tasks = std::move(tasks)]() mutable -> std::size_t { - std::size_t ret = 0; - for (auto& task : tasks) { - ret += task.get(); - } - return ret; - }); - return defaults::thread_pool().submit_task(std::move(gather_tasks)); + auto gather_tasks = [tasks = std::move(tasks)]() mutable -> std::size_t { + std::size_t ret = 0; + for (auto& task : tasks) { + ret += task.get(); + } + return ret; + }; + return detail::submit_move_only_task(std::move(gather_tasks)); } } // namespace kvikio From c8f2e1b40c371f91468786964f5138919114e308 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 30 Jan 2025 00:34:31 -0500 Subject: [PATCH 4/6] Cleanup --- cpp/include/kvikio/parallel_operation.hpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index 0449da7f6f..441c15e800 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -47,10 +47,8 @@ namespace detail { template auto make_copyable_lambda(F op) { - static_assert(std::is_move_constructible_v); - - // Create the callable on the heap by moving from f. Use a shared pointer to manage its lifetime. - auto sp = std::make_shared(std::forward(op)); + // Create the callable on the heap by moving from op. Use a shared pointer to manage its lifetime. + auto sp = std::make_shared(std::move(op)); // Use the copyable closure as the proxy of the move-only callable. return From 25d8850d54f17a8eb5f667a83cbfc73b0b426a2f Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 30 Jan 2025 00:39:44 -0500 Subject: [PATCH 5/6] Minor fix --- cpp/include/kvikio/parallel_operation.hpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index 441c15e800..b69f67b37a 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -110,12 +110,7 @@ std::future parallel_io(F op, std::size_t task_size, std::size_t devPtr_offset) { - static_assert(std::is_invocable_r_v); + static_assert(std::is_invocable_r_v); if (task_size == 0) { throw std::invalid_argument("`task_size` cannot be zero"); } From 1691bd035f0b5b6288e78bff65d7c28a51bb2289 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Thu, 6 Feb 2025 15:31:13 -0500 Subject: [PATCH 6/6] Address reviewer comment. Also minor improvements --- cpp/include/kvikio/utils.hpp | 35 ++++++++++++++++++++++++++++++++++- cpp/src/file_handle.cpp | 15 +++++++-------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/cpp/include/kvikio/utils.hpp b/cpp/include/kvikio/utils.hpp index 41b50f7108..8194310198 100644 --- a/cpp/include/kvikio/utils.hpp +++ b/cpp/include/kvikio/utils.hpp @@ -150,10 +150,43 @@ class PushAndPopContext { std::tuple get_alloc_info(void const* devPtr, CUcontext* ctx = nullptr); +/** + * @brief Create a shared state in a future object that is immediately ready. + * + * A partial implementation of the namesake function from the concurrency TS + * (https://en.cppreference.com/w/cpp/experimental/make_ready_future). The cases of + * std::reference_wrapper and void are not implemented. + * + * @tparam T Type of the value provided. + * @param t Object provided. + * @return A future holding a decayed copy of the object provided. + */ +template +std::future> make_ready_future(T&& t) +{ + std::promise> p; + auto fut = p.get_future(); + p.set_value(std::forward(t)); + return fut; +} + +/** + * @brief Check the status of the future object. True indicates that the result is available in the + * future's shared state. False otherwise. + * + * The future shall not be created using `std::async(std::launch::deferred)`. Otherwise, this + * function always returns true. + * + * @tparam T Type of the future. + * @param future Instance of the future. + * @return Boolean answer indicating if the future is ready or not. + */ template bool is_future_done(T const& future) { - assert(future.valid()); + if (!future.valid()) { + throw std::invalid_argument("The future object does not refer to a valid shared state."); + } return future.wait_for(std::chrono::seconds(0)) != std::future_status::timeout; } diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index 882799e016..a1bb5781ee 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -26,6 +26,7 @@ #include #include #include +#include "kvikio/utils.hpp" namespace kvikio { @@ -212,10 +213,9 @@ std::future FileHandle::pread(void* buf, if (size < gds_threshold) { PushAndPopContext c(ctx); auto bytes_read = detail::posix_device_read(_fd_direct_off.fd(), buf, size, file_offset, 0); - std::promise read_promise; - auto read_future = read_promise.get_future(); - read_promise.set_value(bytes_read); - return read_future; + // Maintain API consistency while making this trivial case synchronous. + // The result in the future is immediately available after the call. + return make_ready_future(bytes_read); } // Let's synchronize once instead of in each task. @@ -263,10 +263,9 @@ std::future FileHandle::pwrite(void const* buf, if (size < gds_threshold) { PushAndPopContext c(ctx); auto bytes_write = detail::posix_device_write(_fd_direct_off.fd(), buf, size, file_offset, 0); - std::promise write_promise; - auto write_future = write_promise.get_future(); - write_promise.set_value(bytes_write); - return write_future; + // Maintain API consistency while making this trivial case synchronous. + // The result in the future is immediately available after the call. + return make_ready_future(bytes_write); } // Let's synchronize once instead of in each task.