diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index f345333c4f..e1c29d392e 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,10 @@ #include #include +#include #include #include +#include #include #include @@ -30,12 +32,60 @@ namespace kvikio { namespace detail { -template -std::future submit_task( - F op, T buf, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset) +/** + * @brief Utility function to create a copyable callable from a move-only callable. + * + * The underlying thread pool uses `std::function` (until C++23) or `std::move_only_function` + * (since C++23) as the element type of the task queue. For the former case that currently applies, + * the `std::function` requires its "target" (associated callable) to be copy-constructible. This + * utility function is a workaround for those move-only callables. + * + * @tparam F Callable type. F shall be move-only. + * @param op Callable. + * @return A new callable that satisfies the copy-constructible condition. + */ +template +auto make_copyable_lambda(F op) { - return defaults::thread_pool().submit_task( - [=] { return op(buf, size, file_offset, devPtr_offset); }); + // Create the callable on the heap by moving from op. Use a shared pointer to manage its lifetime. + auto sp = std::make_shared(std::move(op)); + + // Use the copyable closure as the proxy of the move-only callable. + return + [sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward(args)...); }; +} + +/** + * @brief Submit the task callable to the underlying thread pool. + * + * Both the callable and arguments shall satisfy copy-constructible. + * + * @tparam F Callable type. + * @tparam Args Argument type. + * @param op Callable. + * @param args Arguments to the callable. + * @return A future to be used later to check if the operation has finished its execution. + */ +template +std::future submit_task(F&& op, Args&&... args) +{ + static_assert(std::is_invocable_r_v, Args...>); + return defaults::thread_pool().submit_task([=] { return op(args...); }); +} + +/** + * @brief Submit the move-only task callable to the underlying thread pool. + * + * @tparam F Callable type. F shall be move-only and have no argument. + * @param op Callable. + * @return A future to be used later to check if the operation has finished its execution. + */ +template +std::future submit_move_only_task(F op_move_only) +{ + static_assert(std::is_invocable_r_v); + auto op_copyable = make_copyable_lambda(std::move(op_move_only)); + return defaults::thread_pool().submit_task(op_copyable); } } // namespace detail @@ -60,6 +110,8 @@ std::future parallel_io(F op, std::size_t task_size, std::size_t devPtr_offset) { + static_assert(std::is_invocable_r_v); + if (task_size == 0) { throw std::invalid_argument("`task_size` cannot be zero"); } // Single-task guard @@ -67,30 +119,35 @@ std::future parallel_io(F op, return detail::submit_task(op, buf, size, file_offset, devPtr_offset); } - // We know an upper bound of the total number of tasks - std::vector> tasks; - tasks.reserve(size / task_size + 2); + std::vector> tasks_before_last; + auto const num_full_tasks = size / task_size; + auto const remaining_bytes = size - num_full_tasks * task_size; + auto num_tasks{num_full_tasks}; + if (remaining_bytes != 0) { ++num_tasks; } + tasks_before_last.reserve(num_tasks - 1); - // 1) Submit `task_size` sized tasks - while (size >= task_size) { - tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset)); + // 1) Submit all tasks but the last one. These are all `task_size` sized tasks. + for (std::size_t i = 0; i < num_tasks - 1; ++i) { + tasks_before_last.push_back( + detail::submit_task(op, buf, task_size, file_offset, devPtr_offset)); file_offset += task_size; devPtr_offset += task_size; size -= task_size; } - // 2) Submit a task for the remainder - if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); } + // 2) Submit the last task, which consists of performing the last I/O and waiting the previous + // tasks. + auto last_task_size = (remaining_bytes == 0) ? task_size : remaining_bytes; - // Finally, we sum the result of all tasks. - auto gather_tasks = [](std::vector>&& tasks) -> std::size_t { - std::size_t ret = 0; - for (auto& task : tasks) { + auto last_task = [=, tasks_before_last = std::move(tasks_before_last)]() mutable -> std::size_t { + std::size_t ret = op(buf, last_task_size, file_offset, devPtr_offset); + for (auto& task : tasks_before_last) { ret += task.get(); } return ret; }; - return std::async(std::launch::deferred, gather_tasks, std::move(tasks)); + + return detail::submit_move_only_task(std::move(last_task)); } } // namespace kvikio diff --git a/cpp/include/kvikio/utils.hpp b/cpp/include/kvikio/utils.hpp index b10e54c482..8194310198 100644 --- a/cpp/include/kvikio/utils.hpp +++ b/cpp/include/kvikio/utils.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -149,9 +150,43 @@ class PushAndPopContext { std::tuple get_alloc_info(void const* devPtr, CUcontext* ctx = nullptr); +/** + * @brief Create a shared state in a future object that is immediately ready. + * + * A partial implementation of the namesake function from the concurrency TS + * (https://en.cppreference.com/w/cpp/experimental/make_ready_future). The cases of + * std::reference_wrapper and void are not implemented. + * + * @tparam T Type of the value provided. + * @param t Object provided. + * @return A future holding a decayed copy of the object provided. + */ +template +std::future> make_ready_future(T&& t) +{ + std::promise> p; + auto fut = p.get_future(); + p.set_value(std::forward(t)); + return fut; +} + +/** + * @brief Check the status of the future object. True indicates that the result is available in the + * future's shared state. False otherwise. + * + * The future shall not be created using `std::async(std::launch::deferred)`. Otherwise, this + * function always returns true. + * + * @tparam T Type of the future. + * @param future Instance of the future. + * @return Boolean answer indicating if the future is ready or not. + */ template bool is_future_done(T const& future) { + if (!future.valid()) { + throw std::invalid_argument("The future object does not refer to a valid shared state."); + } return future.wait_for(std::chrono::seconds(0)) != std::future_status::timeout; } diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index 0e65afb7fd..a1bb5781ee 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -26,6 +26,7 @@ #include #include #include +#include "kvikio/utils.hpp" namespace kvikio { @@ -210,11 +211,11 @@ std::future FileHandle::pread(void* buf, // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { - auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { - PushAndPopContext c(ctx); - return detail::posix_device_read(_fd_direct_off.fd(), buf, size, file_offset, 0); - }; - return std::async(std::launch::deferred, task); + PushAndPopContext c(ctx); + auto bytes_read = detail::posix_device_read(_fd_direct_off.fd(), buf, size, file_offset, 0); + // Maintain API consistency while making this trivial case synchronous. + // The result in the future is immediately available after the call. + return make_ready_future(bytes_read); } // Let's synchronize once instead of in each task. @@ -260,11 +261,11 @@ std::future FileHandle::pwrite(void const* buf, // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { - auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { - PushAndPopContext c(ctx); - return detail::posix_device_write(_fd_direct_off.fd(), buf, size, file_offset, 0); - }; - return std::async(std::launch::deferred, task); + PushAndPopContext c(ctx); + auto bytes_write = detail::posix_device_write(_fd_direct_off.fd(), buf, size, file_offset, 0); + // Maintain API consistency while making this trivial case synchronous. + // The result in the future is immediately available after the call. + return make_ready_future(bytes_write); } // Let's synchronize once instead of in each task.