Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on HTTP 50x errors #603

Open
wants to merge 28 commits into
base: branch-25.04
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e2934fa
Retry on HTTP 50x errors
TomAugspurger Jan 29, 2025
2ef47e4
Throw 500 errors on HEAD too
TomAugspurger Feb 3, 2025
bf33697
Added C++ tests for parse_http_status_codes
TomAugspurger Feb 3, 2025
d7d377b
Added timeouts to benchmark tests
TomAugspurger Feb 4, 2025
d89bf5e
docfix
TomAugspurger Feb 4, 2025
b641240
Use threads
TomAugspurger Feb 4, 2025
9a01f42
Don't export curl dependency
KyleFromNVIDIA Feb 6, 2025
5b7ccbd
Propagate KvikIO_REMOTE_SUPPORT
KyleFromNVIDIA Feb 6, 2025
58400cb
CURLINFO_RESPONSE_CODE uses long (not int)
madsbk Feb 7, 2025
5860f41
getenv_or(): move std::vector<int>
madsbk Feb 7, 2025
b418c00
docstring
TomAugspurger Feb 7, 2025
54f8a34
Updated runtime_settings
TomAugspurger Feb 7, 2025
fb888c2
max_attempts -> http_max_attempts
TomAugspurger Feb 7, 2025
3ccec96
Merge remote-tracking branch 'upstream/branch-25.04' into tom/retry-http
TomAugspurger Feb 7, 2025
6a7235e
Address comments
TomAugspurger Feb 7, 2025
774dc58
doc fix
TomAugspurger Feb 7, 2025
ec31221
Merge branch 'branch-25.04' into tom/retry-http
TomAugspurger Feb 10, 2025
1644485
Apply fixes
TomAugspurger Feb 10, 2025
7edb900
back to const
TomAugspurger Feb 10, 2025
94b7147
PR review
TomAugspurger Feb 11, 2025
d6775b7
Fixup
TomAugspurger Feb 11, 2025
5e5e64a
Redo backoff
TomAugspurger Feb 11, 2025
5ff1ab7
Fixups
TomAugspurger Feb 11, 2025
17d5fe0
Fixup
TomAugspurger Feb 13, 2025
17895c4
formatting
TomAugspurger Feb 13, 2025
85d4fd7
Fixup
TomAugspurger Feb 14, 2025
522e5d8
Merge remote-tracking branch 'upstream/branch-25.04' into tom/retry-http
TomAugspurger Feb 14, 2025
88133fa
removed comments
TomAugspurger Feb 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/environments/all_cuda-118_arch-aarch64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies:
- pre-commit
- pytest
- pytest-cov
- pytest-timeout
- python>=3.10,<3.13
- rangehttpserver
- rapids-build-backend>=0.3.0,<0.4.0.dev0
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- pre-commit
- pytest
- pytest-cov
- pytest-timeout
- python>=3.10,<3.13
- rangehttpserver
- rapids-build-backend>=0.3.0,<0.4.0.dev0
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-128_arch-aarch64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies:
- pre-commit
- pytest
- pytest-cov
- pytest-timeout
- python>=3.10,<3.13
- rangehttpserver
- rapids-build-backend>=0.3.0,<0.4.0.dev0
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-128_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies:
- pre-commit
- pytest
- pytest-cov
- pytest-timeout
- python>=3.10,<3.13
- rangehttpserver
- rapids-build-backend>=0.3.0,<0.4.0.dev0
Expand Down
22 changes: 21 additions & 1 deletion cpp/doxygen/main_page.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,30 @@ To improve performance of small IO requests, `.pread()` and `.pwrite()` implemen
This setting can also be controlled by `defaults::gds_threshold()` and `defaults::gds_threshold_reset()`.

#### Size of the Bounce Buffer (KVIKIO_GDS_THRESHOLD)
KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB).
KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable `KVIKIO_BOUNCE_BUFFER_SIZE` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by `defaults::bounce_buffer_size()` and `defaults::bounce_buffer_size_reset()`.

#### HTTP Retries

The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES` and `KVIKIO_HTTP_MAX_ATTEMPTS` environment variables.
`KVIKIO_HTTP_STATUS_CODES` controls the status codes to retry, and `KVIKIO_HTTP_MAX_ATTEMPTS` controls the maximum number of attempts to make before throwing an exception.

When a response with a status code in the list of retryable codes is received, KvikIO will wait for some period of time before retrying the request.
It will keep retrying until reaching the maximum number of attempts.

By default, KvikIO will retry responses with the following status codes:

- 429
- 500
- 502
- 503
- 504

KvikIO will, by default, make three attempts per read.
Note that if you're reading a large file that has been split into multiple reads through the KvikIO's task size setting, then *each* task will be retried up to the maximum number of attempts.

These settings can also be controlled by `defaults::http_max_attempts()`, `defaults::http_max_attempts_reset()`, `defaults::http_status_codes()`, and `defaults::http_status_codes_reset()`.

## Example

Expand Down
61 changes: 60 additions & 1 deletion cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ namespace detail {
*/
CompatMode parse_compat_mode_str(std::string_view compat_mode_str);

/**
* @brief Parse a string of comma-separated string of HTTP status codes.
*
* @param env_var_name The environment variable holding the string.
* Used to report errors.
* @param status_codes The comma-separated string of HTTP status
* codes. Each code should be a 3-digit integer.
*
* @return The vector with the parsed, integer HTTP status codes.
*/
std::vector<int> parse_http_status_codes(std::string_view env_var_name,
std::string const& status_codes);

} // namespace detail

template <typename T>
Expand All @@ -81,6 +94,9 @@ bool getenv_or(std::string_view env_var_name, bool default_val);
template <>
CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val);

template <>
std::vector<int> getenv_or(std::string_view env_var_name, std::vector<int> default_val);

/**
* @brief Singleton class of default values used throughout KvikIO.
*
Expand All @@ -92,6 +108,8 @@ class defaults {
std::size_t _task_size;
std::size_t _gds_threshold;
std::size_t _bounce_buffer_size;
std::size_t _http_max_attempts;
std::vector<int> _http_status_codes;

static unsigned int get_num_threads_from_env();

Expand Down Expand Up @@ -181,7 +199,7 @@ class defaults {
* always use the same thread pool however it is possible to change number of
* threads in the pool (see `kvikio::default::thread_pool_nthreads_reset()`).
*
* @return The the default thread pool instance.
* @return The default thread pool instance.
*/
[[nodiscard]] static BS::thread_pool& thread_pool();

Expand Down Expand Up @@ -258,6 +276,47 @@ class defaults {
* @param nbytes The bounce buffer size in bytes.
*/
static void bounce_buffer_size_reset(std::size_t nbytes);

/**
* @brief Get the maximum number of attempts per remote IO read.
*
* Set the value using `kvikio::default::http_max_attempts_reset()` or by setting
* the `KVIKIO_HTTP_MAX_ATTEMPTS` environment variable. If not set, the value is 3.
*
* @return The maximum number of remote IO reads to attempt before raising an
* error.
*/
[[nodiscard]] static std::size_t http_max_attempts();

/**
* @brief Reset the maximum number of attempts per remote IO read.
*
* @param attempts The maximum number of attempts to try before raising an error.
*/
static void http_max_attempts_reset(std::size_t attempts);

/**
* @brief The list of HTTP status codes to retry.
*
* Set the value using `kvikio::default::http_status_codes()` or by setting the
* `KVIKIO_HTTP_STATUS_CODES` environment variable. If not set, the default value is
*
* - 429
* - 500
* - 502
* - 503
* - 504
*
* @return The list of HTTP status codes to retry.
*/
[[nodiscard]] static std::vector<int> const& http_status_codes();

/**
* @brief Reset the list of HTTP status codes to retry.
*
* @param status_codes The HTTP status codes to retry.
*/
static void http_status_codes_reset(std::vector<int> status_codes);
};

} // namespace kvikio
62 changes: 62 additions & 0 deletions cpp/src/defaults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include <regex>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -49,6 +50,28 @@ CompatMode parse_compat_mode_str(std::string_view compat_mode_str)
return res;
}

std::vector<int> parse_http_status_codes(std::string_view env_var_name,
std::string const& status_codes)
{
// Ensure `status_codes` consists only of 3-digit integers separated by commas, allowing spaces.
std::regex const check_pattern(R"(^\s*\d{3}\s*(\s*,\s*\d{3}\s*)*$)");
if (!std::regex_match(status_codes, check_pattern)) {
throw std::invalid_argument(std::string{env_var_name} +
": invalid format, expected comma-separated integers.");
}

// Match every integer in `status_codes`.
std::regex const number_pattern(R"(\d+)");

// For each match, we push_back `std::stoi(match.str())` into `ret`.
std::vector<int> ret;
std::transform(std::sregex_iterator(status_codes.begin(), status_codes.end(), number_pattern),
std::sregex_iterator(),
std::back_inserter(ret),
[](std::smatch const& match) -> int { return std::stoi(match.str()); });
return ret;
}

} // namespace detail

template <>
Expand Down Expand Up @@ -91,6 +114,17 @@ CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val)
return detail::parse_compat_mode_str(env_val);
}

template <>
std::vector<int> getenv_or(std::string_view env_var_name, std::vector<int> default_val)
{
auto* const env_val = std::getenv(env_var_name.data());
if (env_val == nullptr) { return std::move(default_val); }
std::string const int_str(env_val);
if (int_str.empty()) { return std::move(default_val); }

return detail::parse_http_status_codes(env_var_name, int_str);
}

unsigned int defaults::get_num_threads_from_env()
{
int const ret = getenv_or("KVIKIO_NTHREADS", 1);
Expand Down Expand Up @@ -132,6 +166,19 @@ defaults::defaults()
}
_bounce_buffer_size = env;
}
// Determine the default value of `http_max_attempts`
{
const ssize_t env = getenv_or("KVIKIO_HTTP_MAX_ATTEMPTS", 3);
if (env <= 0) {
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved
throw std::invalid_argument("KVIKIO_HTTP_MAX_ATTEMPTS has to be a positive integer");
}
_http_max_attempts = env;
}
// Determine the default value of `http_status_codes`
{
_http_status_codes =
getenv_or("KVIKIO_HTTP_STATUS_CODES", std::vector<int>{429, 500, 502, 503, 504});
}
}

defaults* defaults::instance()
Expand Down Expand Up @@ -200,4 +247,19 @@ void defaults::bounce_buffer_size_reset(std::size_t nbytes)
instance()->_bounce_buffer_size = nbytes;
}

std::size_t defaults::http_max_attempts() { return instance()->_http_max_attempts; }
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved

void defaults::http_max_attempts_reset(std::size_t attempts)
{
if (attempts == 0) { throw std::invalid_argument("attempts must be a positive integer"); }
instance()->_http_max_attempts = attempts;
}

std::vector<int> const& defaults::http_status_codes() { return instance()->_http_status_codes; }

void defaults::http_status_codes_reset(std::vector<int> status_codes)
{
instance()->_http_status_codes = std::move(status_codes);
}

} // namespace kvikio
61 changes: 50 additions & 11 deletions cpp/src/shim/libcurl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
* limitations under the License.
*/

#include <chrono>
#include <cstring>
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>

#include <curl/curl.h>
Expand Down Expand Up @@ -116,19 +119,55 @@ CURL* CurlHandle::handle() noexcept { return _handle.get(); }

void CurlHandle::perform()
Copy link
Contributor

@kingcrimsontianyu kingcrimsontianyu Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest reorganizing the logic below to improve readability, such as separating handling of error code from http code, early breaking to reduce the indentation level. This is what I have in mind. Let me know of your thought:

// Untested code. Please check!
void CurlHandle::perform()
{
  long http_code               = 0;
  auto constexpr base_delay_ms = 500;
  auto delay_multiplier        = 1;
  auto backoff_delay_ms        = 0;
  auto max_delay_ms            = kvikio::defaults::http_max_delay_ms();
  auto& http_status_codes      = kvikio::defaults::http_status_codes();
  auto attempt_count           = 0;

  while (true) {
    ++attempt_count;
    auto err = curl_easy_perform(handle());
    if (err != CURLE_OK) {
      std::string msg(_errbuf);  // We can do this because we always initialize `_errbuf` as empty.
      std::stringstream ss;
      ss << "curl_easy_perform() error near " << _source_file << ":" << _source_line;
      if (msg.empty()) {
        ss << "(" << curl_easy_strerror(err) << ")";
      } else {
        ss << "(" << msg << ")";
      }
      throw std::runtime_error(ss.str());
    }

    curl_easy_getinfo(handle(), CURLINFO_RESPONSE_CODE, &http_code);

    // Check if we should retry based on HTTP status code
    if (std::find(http_status_codes.begin(), http_status_codes.end(), http_code) ==
        http_status_codes.end()) {
      // No retry needed
      break;
    }

    // Retry only if one of the specified status codes is returned
    // TODO: Parse the Retry-After header, if it exists.
    // TODO: configurable maximum wait.

    // Current status report
    std::cout << "KvikIO: Retrying HTTP request. Got HTTP code " << http_code << " after "
              << backoff_delay_ms << "ms (attempt " << attempt_count << ")." << std::endl;

    // Prepare for the next attempt
    // backoff and retry again. With a base value of 500ms, we retry after
    // 500ms, 1s, 2s, 4s, ...
    backoff_delay_ms = base_delay_ms * delay_multiplier;
    delay_multiplier <<= 1;

    if (backoff_delay_ms > max_delay_ms) {
      std::stringstream ss;
      ss << "KvikIO: HTTP request reached maximum delay (" << max_delay_ms << "). Got HTTP code "
         << http_code << ".";
      throw std::runtime_error(ss.str());
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(backoff_delay_ms));
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether a refactor like this made sense now. Let me take a look.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

17d5fe0 has something, if you're able to take another look. That's a bit of a compromise between the earlier setup and your suggestion:

  • It does use the attempt_count in the while loop condition, instead of while (true). But the case where we've exceeded our maximum attempts is moved out of the loop, and runs when we break
  • I've added the early return for the case where things are OK, reducing the indentation level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The changes look good to me! Please do test it a bit.

Side note: Hopefully we will improve the way of testing in the future through mocking (#634).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have OK testing for this through the Python tests here.

That checks that we fail after two attempts, and the the expected message printed after the first attempt failed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an off by one error when the user sets the set_http_max_attempts to 1. Looking into it now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 85d4fd7.

Now the messages are

KvikIO: Got HTTP code 503. Retrying after 500ms (attempt 1 of 2).  # on the first failure
KvikIO: HTTP request reached maximum number of attempts (2). Got HTTP code 503.  # on the second / final failure

{
// Perform the curl operation and check for errors.
CURLcode err = curl_easy_perform(handle());
if (err != CURLE_OK) {
std::string msg(_errbuf); // We can do this because we always initialize `_errbuf` as empty.
std::stringstream ss;
ss << "curl_easy_perform() error near " << _source_file << ":" << _source_line;
if (msg.empty()) {
ss << "(" << curl_easy_strerror(err) << ")";
long http_code = 0;
auto attempt_count = 1;
auto base_delay = 500; // milliseconds
auto max_delay = 4000; // milliseconds
auto http_max_attempts = kvikio::defaults::http_max_attempts();
auto& http_status_codes = kvikio::defaults::http_status_codes();

while (attempt_count <= http_max_attempts) {
auto err = curl_easy_perform(handle());
curl_easy_getinfo(handle(), CURLINFO_RESPONSE_CODE, &http_code);

// Check if we should retry based on HTTP status code
if (std::find(http_status_codes.begin(), http_status_codes.end(), http_code) !=
http_status_codes.end()) {
// Retry only if one of the specified status codes is returned
// TODO: Parse the Retry-After header, if it exists.
// TODO: configurable maximum wait.
if (attempt_count >= http_max_attempts) {
std::stringstream ss;
ss << "KvikIO: HTTP request reached maximum number of attempts (" << http_max_attempts
<< "). Got HTTP code " << http_code << ".";
throw std::runtime_error(ss.str());
} else {
// backoff and retry again. With a base value of 500ms, we retry after
// 500ms, 1s, 2s, 4s, ...
auto const backoff_delay = base_delay * (1 << std::min(attempt_count - 1, 4));
// up to a maximum of `max_delay` seconds.
auto const delay = std::min(max_delay, backoff_delay);

attempt_count++;
std::cout << "KvikIO: Retrying HTTP request. Got HTTP code " << http_code << " after "
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
<< delay << "ms (attempt " << attempt_count << " of " << http_max_attempts << ")."
<< std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
}
} else if (err != CURLE_OK) {
std::string msg(_errbuf); // We can do this because we always initialize `_errbuf` as empty.
std::stringstream ss;
ss << "curl_easy_perform() error near " << _source_file << ":" << _source_line;
kingcrimsontianyu marked this conversation as resolved.
Show resolved Hide resolved
if (msg.empty()) {
ss << "(" << curl_easy_strerror(err) << ")";
} else {
ss << "(" << msg << ")";
}
throw std::runtime_error(ss.str());
} else {
ss << "(" << msg << ")";
// No retry needed
break;
}
throw std::runtime_error(ss.str());
}
}

} // namespace kvikio
21 changes: 21 additions & 0 deletions cpp/tests/test_defaults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,24 @@ TEST(Defaults, parse_compat_mode_str)
}
}
}

TEST(Defaults, parse_http_status_codes)
{
{
std::vector<std::string> inputs{
"429,500", "429, 500", " 429,500", "429, 500", "429 ,500", "429,500 "};
std::vector<int> expected = {429, 500};
for (const auto& input : inputs) {
EXPECT_EQ(kvikio::detail::parse_http_status_codes("KVIKIO_HTTP_STATUS_CODES", input),
expected);
}
}

{
std::vector<std::string> inputs{"429,", ",429", "a,b", "429,,500", "429,1000"};
for (const auto& input : inputs) {
EXPECT_THROW(kvikio::detail::parse_http_status_codes("KVIKIO_HTTP_STATUS_CODES", input),
std::invalid_argument);
}
}
}
1 change: 1 addition & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ dependencies:
- rapids-dask-dependency==25.4.*,>=0.0.0a0
- pytest
- pytest-cov
- pytest-timeout
- rangehttpserver
- boto3>=1.21.21
- output_types: [requirements, pyproject]
Expand Down
9 changes: 9 additions & 0 deletions docs/source/runtime_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,12 @@ Size of the Bounce Buffer ``KVIKIO_BOUNCE_BUFFER_SIZE``
KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_size`, :py:func:`kvikio.defaults.bounce_buffer_size_reset`, and :py:func:`kvikio.defaults.set_bounce_buffer_size`.

#### HTTP Retries
-----------------

The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES` and `KVIKIO_HTTP_MAX_ATTEMPTS` environment variables.

`KVIKIO_HTTP_STATUS_CODES` controls the status codes to retry and can be controlled by :py:func:`kvikio.defaults.http_status_codes`, :py:func:`kvikio.defaults.http_status_codes_reset`, and :py:func:`kvikio.defaults.set_http_status_codes`.

`KVIKIO_HTTP_MAX_ATTEMPTS` controls the maximum number of attempts to make before throwing an exception and can be controlled by :py:func:`kvikio.defaults.http_max_attempts`, :py:func:`kvikio.defaults.http_max_attempts_reset`, and :py:func:`kvikio.defaults.set_http_max_attempts`.
Loading