This RFC proposes a new thread pool that supports async operations, which is
called ReadPool
. It is designed to be specifically used for all sorts of read
operations like Kv Get, Kv Scan and Coprocessor Read to improve performance.
Currently, for all KV operations that stay inside Storage
, they are executed
on the same thread pool. This leads to bad operation isolation, i.e. heavy
writes will cause slow reads. In fact, these read operations can be executed on
other threads to be not blocked. For Coprocessor operations, we have an
end-point worker thread to handle async snapshots. This is a bottleneck when
there are heavy coprocessor requests.
By introducing a standalone thread pool that supports async operations (which is called ReadPool) in this RFC, we can fix both problems. For KV operations, read and write operations will be executed on different thread pools, so that they won't affect each other. For Coprocessor operations, since ReadPool supports async operations, the end-point worker is not necessary anymore and the bottleneck no longer exists.
In this RFC, there will be a ReadPool for KV operations and another ReadPool for Coprocessor operations, so that they won't block each other. In the future, it may be possible to merge them together.
The ReadPool provides these features:
-
Support different priorities (high, normal and low).
-
Support contexts and periodical ticking so that we can use local metrics.
-
Support max task threshold as a load control mechanism.
The ReadPool can be implemented via 2 levels. The lower level is a new facility
called util::FuturePool
, which is a simple encapsulation over
CpuPool that supports feature 2. The higher
level is server::ReadPool
, which assembles multiple FuturePool
s and
supports features 1 and 3. In this way, FuturePool
becomes a common facility
that can be reused in the future to run other kinds of async operations that
need contexts, not just limited to the scenario listed in this RFC.
To support periodical ticking, this RFC defines:
pub trait Context: fmt::Debug + Send {
fn on_tick(&mut self) {}
}
The FuturePool
user can provide customized impl Context
so that some code
can be executed periodically within the context of the thread pool's thread.
The FuturePool
is defined as follows:
pub struct FuturePool<T: Context + 'static> {
pool: CpuPool,
context: Arc<HashMap<thread::ThreadId, T>>
running_task_count: Arc<AtomicUsize>,
……
}
The logic of whether to call on_tick
is, when each task finishes running, we
check how much time has elapsed. If the elapsed time since the last tick is
longer than our tick interval, we call on_tick
. In order to do so in a clean
way, we can wrap this logic into a ContextDelegator
:
struct ContextDelegator<T: Context> {
tick_interval: Duration,
inner: RefCell<T>,
last_tick: Cell<Option<Instant>>,
}
impl<T: Context> ContextDelegator<T> {
fn new(context: T, tick_interval: Duration) -> ContextDelegator<T> {
// ...
}
fn on_task_finish(&self) {
// check and optionally call `self.inner.borrow_mut().on_tick()`
}
}
FuturePool
users should pass a Future
to the FuturePool
to execute, just
like CpuPool
. However, to obtain access to the Context
inside Future
, the
provided Future
should be wrapped in a closure which provides access to the
Context
via the parameter. In the further, the parameter may live multiple
Future
s, which may run on different threads and different Context
s. Thus,
the parameter is a Context
accessor instead of a specific Context
.
As a result, this RFC further introduces the following struct, which provides
an access to the current Context
based on running thread:
pub struct ContextDelegators<T: Context> {
...
}
impl<T: Context> ContextDelegators<T> {
pub fn current_thread_context_mut(&self) -> RefMut<T> {
……
}
}
The FuturePool
provides spawn
interface to execute a Future
while
providing
ContextDelegators
:
impl<T: Context + 'static> FuturePool<T> {
pub fn spawn<F, R>(&self, future_factory: R) -> CpuFuture<F::Item, F::Error>
where
R: FnOnce(ContextDelegators<T>) -> F + Send + 'static,
F: Future + Send + 'static,
F::Item: Send + 'static,
F::Error: Send + 'static,
{
……
}
}
ReadPool
is implemented over FuturePool
as follows:
pub struct ReadPool<T: Context + 'static> {
pool_high: FuturePool<T>,
pool_normal: FuturePool<T>,
pool_low: FuturePool<T>,
max_tasks_high: usize,
max_tasks_normal: usize,
max_tasks_low: usize,
}
It also provides an interface similar to FuturePool::spawn
, which specifies
the priority and returns an error when FuturePool
is full:
pub enum Priority {
Normal,
Low,
High,
}
impl<T: futurepool::Context + 'static> ReadPool<T> {
pub fn future_execute<F, R>(
&self,
priority: Priority,
future_factory: R,
) -> Result<CpuFuture<F::Item, F::Error>, Full>
where
R: FnOnce(ContextDelegators<T>) -> F + Send + 'static,
F: Future + Send + 'static,
F::Item: Send + 'static,
F::Error: Send + 'static,
{
let pool = self.get_pool_by_priority(priority);
let max_tasks = self.get_max_tasks_by_priority(priority);
let current_tasks = pool.get_running_task_count();
if current_tasks >= max_tasks {
Err(Full {
current_tasks,
max_tasks,
})
} else {
Ok(pool.spawn(future_factory))
}
}
}
The on_tick
implementation in FuturePool
is not perfect. It is driven by
task completion, which means it will not be executed when there is no task.
This is acceptable since we are going to use on_tick
to report metrics.
We can implement our own future pool instead of utilizing CpuPool
. In this
way, we can have a true on_tick
. However, this is complex and is not a
necessary feature we need.
We can implement our own async model instead of utilizing Future
. However,
there is no benefits compared to the current solution.
None.