Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prototype] boost fiber instead of asio for async #16699

Closed
wants to merge 18 commits into from
3 changes: 3 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ cc_library(
deps = [
":ray_common",
"@boost//:asio",
"@boost//:fiber",
"@com_google_absl//absl/types:optional",
"@com_github_grpc_grpc//:grpc++",
"@com_google_protobuf//:protobuf",
],
Expand Down Expand Up @@ -402,6 +404,7 @@ cc_library(
"//src/ray/protobuf:common_cc_proto",
"//src/ray/protobuf:gcs_cc_proto",
"@boost//:asio",
"@boost//:fiber",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down
2 changes: 0 additions & 2 deletions python/ray/util/placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def ready(self) -> ObjectRef:
# to schedule a single task.
bundle_index = 0
bundle = self.bundle_cache[bundle_index]

resource_name, value = self._get_a_non_zero_resource(bundle)
num_cpus = 0
num_gpus = 0
Expand All @@ -87,7 +86,6 @@ def ready(self) -> ObjectRef:
memory = value
else:
resources[resource_name] = value

return bundle_reservation_check.options(
num_cpus=num_cpus,
num_gpus=num_gpus,
Expand Down
9 changes: 9 additions & 0 deletions src/ray/common/asio_round_robin.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "ray/common/asio_round_robin.h"
namespace boost {
namespace fibers {
namespace asio {

boost::asio::io_context::id round_robin::service::id;
}
} // namespace fibers
} // namespace boost
177 changes: 177 additions & 0 deletions src/ray/common/asio_round_robin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H

#include <chrono>
#include <cstddef>
#include <memory>
#include <mutex>
#include <queue>

#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/assert.hpp>
#include <boost/config.hpp>

#include <boost/fiber/condition_variable.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/scheduler.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
#include BOOST_ABI_PREFIX
#endif

namespace boost {
namespace fibers {
namespace asio {

class round_robin : public algo::algorithm {
private:
std::shared_ptr<boost::asio::io_context> io_ctx_;
boost::asio::steady_timer suspend_timer_;
boost::fibers::scheduler::ready_queue_type rqueue_{};
boost::fibers::mutex mtx_{};
boost::fibers::condition_variable cnd_{};
std::size_t counter_{0};

public:
struct service : public boost::asio::io_context::service {
static boost::asio::io_context::id id;

std::unique_ptr<boost::asio::io_context::work> work_;

service(boost::asio::io_context &io_ctx)
: boost::asio::io_context::service(io_ctx),
work_{new boost::asio::io_context::work(io_ctx)} {}

virtual ~service() {}

service(service const &) = delete;
service &operator=(service const &) = delete;

void shutdown_service() override final { work_.reset(); }
};

round_robin(std::shared_ptr<boost::asio::io_context> const &io_ctx)
: io_ctx_(io_ctx), suspend_timer_(*io_ctx_) {
boost::asio::add_service(*io_ctx_, new service(*io_ctx_));
boost::asio::post(*io_ctx_, [this]() mutable {
//]
//[asio_rr_service_lambda
while (!io_ctx_->stopped()) {
if (has_ready_fibers()) {
// run all pending handlers in round_robin
while (io_ctx_->poll())
;
// block this fiber till all pending (ready) fibers are processed
// == round_robin::suspend_until() has been called
std::unique_lock<boost::fibers::mutex> lk(mtx_);
cnd_.wait(lk);
} else {
// run one handler inside io_context
// if no handler available, block this thread
if (!io_ctx_->run_one()) {
break;
}
}
}
//]
});
}

void awakened(context *ctx) noexcept {
BOOST_ASSERT(nullptr != ctx);
BOOST_ASSERT(!ctx->ready_is_linked());
ctx->ready_link(rqueue_); /*< fiber, enqueue on ready queue >*/
if (!ctx->is_context(boost::fibers::type::dispatcher_context)) {
++counter_;
}
}

context *pick_next() noexcept {
context *ctx(nullptr);
if (!rqueue_.empty()) { /*<
pop an item from the ready queue
>*/
ctx = &rqueue_.front();
rqueue_.pop_front();
BOOST_ASSERT(nullptr != ctx);
BOOST_ASSERT(context::active() != ctx);
if (!ctx->is_context(boost::fibers::type::dispatcher_context)) {
--counter_;
}
}
return ctx;
}

bool has_ready_fibers() const noexcept { return 0 < counter_; }

//[asio_rr_suspend_until
void suspend_until(std::chrono::steady_clock::time_point const &abs_time) noexcept {
// Set a timer so at least one handler will eventually fire, causing
// run_one() to eventually return.
if ((std::chrono::steady_clock::time_point::max)() != abs_time) {
// Each expires_at(time_point) call cancels any previous pending
// call. We could inadvertently spin like this:
// dispatcher calls suspend_until() with earliest wake time
// suspend_until() sets suspend_timer_
// lambda loop calls run_one()
// some other asio handler runs before timer expires
// run_one() returns to lambda loop
// lambda loop yields to dispatcher
// dispatcher finds no ready fibers
// dispatcher calls suspend_until() with SAME wake time
// suspend_until() sets suspend_timer_ to same time, canceling
// previous async_wait()
// lambda loop calls run_one()
// asio calls suspend_timer_ handler with operation_aborted
// run_one() returns to lambda loop... etc. etc.
// So only actually set the timer when we're passed a DIFFERENT
// abs_time value.
suspend_timer_.expires_at(abs_time);
suspend_timer_.async_wait(
[](boost::system::error_code const &) { this_fiber::yield(); });
}
cnd_.notify_one();
}
//]

//[asio_rr_notify
void notify() noexcept {
// Something has happened that should wake one or more fibers BEFORE
// suspend_timer_ expires. Reset the timer to cause it to fire
// immediately, causing the run_one() call to return. In theory we
// could use cancel() because we don't care whether suspend_timer_'s
// handler is called with operation_aborted or success. However --
// cancel() doesn't change the expiration time, and we use
// suspend_timer_'s expiration time to decide whether it's already
// set. If suspend_until() set some specific wake time, then notify()
// canceled it, then suspend_until() was called again with the same
// wake time, it would match suspend_timer_'s expiration time and we'd
// refrain from setting the timer. So instead of simply calling
// cancel(), reset the timer, which cancels the pending sleep AND sets
// a new expiration time. This will cause us to spin the loop twice --
// once for the operation_aborted handler, once for timer expiration
// -- but that shouldn't be a big problem.
suspend_timer_.async_wait(
[](boost::system::error_code const &) { this_fiber::yield(); });
suspend_timer_.expires_at(std::chrono::steady_clock::now());
}
//]
};

} // namespace asio
} // namespace fibers
} // namespace boost

#ifdef BOOST_HAS_ABI_HEADERS
#include BOOST_ABI_SUFFIX
#endif

#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H
10 changes: 10 additions & 0 deletions src/ray/common/thread_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "ray/common/thread_pool.h"

namespace ray {
namespace thread_pool {

CPUThreadPool _cpu_pool(4);
IOThreadPool _io_pool;

} // namespace thread_pool
} // namespace ray
92 changes: 92 additions & 0 deletions src/ray/common/thread_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#pragma once
#include <boost/asio.hpp>
#include <boost/fiber/all.hpp>
#include <memory>
#include <type_traits>
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio_round_robin.h"

namespace ray {
namespace thread_pool {

namespace {

template <typename F, typename R = typename std::result_of<F()>::type,
typename std::enable_if<!std::is_same<R, void>::value, int>::type = 0>
void _run_job(F &&f, boost::fibers::promise<R> *p) {
p->set_value(f());
}

template <typename F, typename R = typename std::result_of<F()>::type,
typename std::enable_if<std::is_same<R, void>::value, int>::type = 0>
void _run_job(F &&f, boost::fibers::promise<R> *p) {
f();
p->set_value();
}

} // namespace

class CPUThreadPool {
public:
CPUThreadPool(size_t n) : pool_(n) {}
template <typename F>
auto post(F &&f) {
auto p = std::make_unique<boost::fibers::promise<decltype(f())>>().release();
auto future = p->get_future();
boost::asio::post(pool_, [f = std::move(f), p = p] {
_run_job(std::move(f), p);
delete p;
});
return future;
}

~CPUThreadPool() { pool_.join(); }

private:
boost::asio::thread_pool pool_;
};

class IOThreadPool {
public:
IOThreadPool() : io_service_(std::make_shared<instrumented_io_context>()) {
boost::fibers::use_scheduling_algorithm<boost::fibers::asio::round_robin>(
io_service_);
}

template <typename F>
auto post(F &&f) {
auto p = std::make_unique<boost::fibers::promise<decltype(f())>>().release();
auto future = p->get_future();
io_service_->post([f = std::move(f), p] {
boost::fibers::fiber co([f = std::move(f), p]() {
_run_job(std::move(f), p);
delete p;
});
co.join();
});
return future;
}

instrumented_io_context &GetIOService() { return *io_service_; }

bool stopped() { return io_service_->stopped(); }

private:
std::shared_ptr<instrumented_io_context> io_service_;
};

extern CPUThreadPool _cpu_pool;
extern IOThreadPool _io_pool;

template <typename F, typename... Ts>
auto io_post(F &&f, Ts &&... args) {
return _io_pool.post(std::bind(std::move(f), std::forward(args)...));
}

template <typename F, typename... Ts>
auto cpu_post(F &&f, Ts &&... args) {
return _cpu_pool.post(std::bind(std::move(f), std::forward(args)...));
}

} // namespace thread_pool
} // namespace ray
Loading