Skip to content

Commit

Permalink
Merge branch 'master' into Connor1996-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
Connor1996 committed Nov 26, 2018
2 parents 918d432 + 32f1947 commit b31552c
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 0 deletions.
194 changes: 194 additions & 0 deletions text/2017-12-22-read-pool.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Summary

This RFC proposes a new thread pool that supports async operations, which is called `ReadPool`. It
is designed to be specifically used for all sorts of read operations like Kv Get, Kv Scan and
Coprocessor Read to improve performance.

# Motivation

Currently, for all KV operations that stay inside `Storage`, they are executed on the same thread
pool. This leads to bad operation isolation, i.e. heavy writes will cause slow reads. In fact, these
read operations can be executed on other threads to be not blocked. For Coprocessor operations, we
have an end-point worker thread to handle async snapshots. This is a bottleneck when there are heavy
coprocessor requests.

By introducing a standalone thread pool that supports async operations (which is called ReadPool) in
this RFC, we can fix both problems. For KV operations, read and write operations will be executed
on different thread pools, so that they won't affect each other. For Coprocessor operations, since
ReadPool supports async operations, the end-point worker is not necessary anymore and the bottleneck
no longer exists.

In this RFC, there will be a ReadPool for KV operations and another ReadPool for Coprocessor
operations, so that they won't block each other. In the future, it may be possible to merge
them together.

# Detailed design

The ReadPool provides these features:

1. Support different priorities (high, normal and low).

2. Support contexts and periodical ticking so that we can use local metrics.

3. Support max task threshold as a load control mechanism.

The ReadPool can be implemented via 2 levels. The lower level is a new facility called
`util::FuturePool`, which is a simple encapsulation over [CpuPool](https://docs.rs/futures-cpupool/)
that supports feature 2. The higher level is `server::ReadPool`, which assembles multiple
`FuturePool`s and supports features 1 and 3. In this way, `FuturePool` becomes a common facility
that can be reused in the future to run other kinds of async operations that need contexts, not just
limited to the scenario listed in this RFC.

## FuturePool

To support periodical ticking, this RFC defines:

```rust
pub trait Context: fmt::Debug + Send {
fn on_tick(&mut self) {}
}
```

The `FuturePool` user can provide customized `impl Context` so that some code can be executed
periodically within the context of the thread pool's thread.

The `FuturePool` is defined as follows:

```rust
pub struct FuturePool<T: Context + 'static> {
pool: CpuPool,
context: Arc<HashMap<thread::ThreadId, T>>
running_task_count: Arc<AtomicUsize>,
……
}
```

The logic of whether to call `on_tick` is, when each task finishes running, we check how much
time has elapsed. If the elapsed time since the last tick is longer than our tick interval, we call
`on_tick`. In order to do so in a clean way, we can wrap this logic into a `ContextDelegator`:

```rust
struct ContextDelegator<T: Context> {
tick_interval: Duration,
inner: RefCell<T>,
last_tick: Cell<Option<Instant>>,
}

impl<T: Context> ContextDelegator<T> {
fn new(context: T, tick_interval: Duration) -> ContextDelegator<T> {
// ...
}

fn on_task_finish(&self) {
// check and optionally call `self.inner.borrow_mut().on_tick()`
}
}
```

`FuturePool` users should pass a `Future` to the `FuturePool` to execute, just like `CpuPool`.
However, to obtain access to the `Context` inside `Future`, the provided `Future` should be wrapped
in a closure which provides access to the `Context` via the parameter. In the further, the parameter
may live multiple `Future`s, which may run on different threads and different `Context`s. Thus, the
parameter is a `Context` accessor instead of a specific `Context`.

As a result, this RFC further introduces the following struct, which provides an access to the
current `Context` based on running thread:

```rust
pub struct ContextDelegators<T: Context> {
...
}

impl<T: Context> ContextDelegators<T> {
pub fn current_thread_context_mut(&self) -> RefMut<T> {
……
}
}
```

The `FuturePool` provides `spawn` interface to execute a `Future` while providing
`ContextDelegators`:

```rust
impl<T: Context + 'static> FuturePool<T> {
pub fn spawn<F, R>(&self, future_factory: R) -> CpuFuture<F::Item, F::Error>
where
R: FnOnce(ContextDelegators<T>) -> F + Send + 'static,
F: Future + Send + 'static,
F::Item: Send + 'static,
F::Error: Send + 'static,
{
……
}
}
```

## ReadPool

`ReadPool` is implemented over `FuturePool` as follows:

```rust
pub struct ReadPool<T: Context + 'static> {
pool_high: FuturePool<T>,
pool_normal: FuturePool<T>,
pool_low: FuturePool<T>,
max_tasks_high: usize,
max_tasks_normal: usize,
max_tasks_low: usize,
}
```

It also provides an interface similar to `FuturePool::spawn`, which specifies the priority and
returns an error when `FuturePool` is full:

```rust
pub enum Priority {
Normal,
Low,
High,
}

impl<T: futurepool::Context + 'static> ReadPool<T> {
pub fn future_execute<F, R>(
&self,
priority: Priority,
future_factory: R,
) -> Result<CpuFuture<F::Item, F::Error>, Full>
where
R: FnOnce(ContextDelegators<T>) -> F + Send + 'static,
F: Future + Send + 'static,
F::Item: Send + 'static,
F::Error: Send + 'static,
{
let pool = self.get_pool_by_priority(priority);
let max_tasks = self.get_max_tasks_by_priority(priority);
let current_tasks = pool.get_running_task_count();
if current_tasks >= max_tasks {
Err(Full {
current_tasks,
max_tasks,
})
} else {
Ok(pool.spawn(future_factory))
}
}
}
```

# Drawbacks

The `on_tick` implementation in `FuturePool` is not perfect. It is driven by task completion, which
means it will not be executed when there is no task. This is acceptable since we are going to use
`on_tick` to report metrics.

# Alternatives

We can implement our own future pool instead of utilizing `CpuPool`. In this way, we can have a true
`on_tick`. However, this is complex and is not a necessary feature we need.

We can implement our own async model instead of utilizing `Future`. However, there is no benefits
compared to the current solution.

# Unresolved questions

None.
94 changes: 94 additions & 0 deletions text/2018-10-9-reduce-one-copy-in-grpc-send.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Summary

Reduce one data copy in the process of sending messages in grpcio.
This is not a new feature but an optimization.

# Motivation

In pingcap/grpc-rs (which is named [`grpcio`](https://docs.rs/crate/grpcio/)
on crates.io), we can call gRPC's core API to send data.
In our old implementation, the whole process is:

0. User code produces the data to be sent (particularly, the protobuf library's
deserialization) -- a list of `&[u8]` in a call-back
0. The data get copied into a `Vec<u8>`. For the protobuf library, this is done
internally (**copy 0**)
```rust
pub fn bin_ser(t: &Vec<u8>, buf: &mut Vec<u8>) {
buf.extend_from_slice(t)
}
```
0. We create a `grpc_slice` (a ref-counted single string) by copying the
`Vec<u8>` mentioned above (**copy 1**)
0. We create a `grpc_byte_buffer` (a ref-counted list of strings) from only
one `grpc_slice`
0. Send data

In total, we copy the data twice. The first copy is unnecessary.

# Detailed design

## Basic concepts

First, due to Rust and C have different naming conventions:

+ `grpc_slice` on C side is renamed to `GrpcSlice` on Rust side
+ `grpc_byte_buffer` on C side is renamed to `GrpcByteBuffer` on Rust side

So don't be confused by the naming difference.

## Changes

After this refactoring,

0. User code produces the data to be sent (particularly, the protobuf library's
deserialization) -- a list of `&[u8]`
0. We copy each produced `&[u8]` into `GrpcSlice`s collect them into a
`GrpcByteBuffer` wrapper (**copy 0**)
```rust
pub fn push(&mut self, slice: GrpcSlice) {
unsafe {
grpc_sys::grpcwrap_byte_buffer_add(self.raw as _, slice);
}
}
```
0. We take the raw C struct pointer in `GrpcByteBuffer` away and pass it
directly into the send functions
```rust
pub unsafe fn take_raw(&mut self) -> *mut grpc_sys::GrpcByteBuffer {
let ret = self.raw;
self.raw = grpc_sys::grpc_raw_byte_buffer_create(ptr::null_mut(), 0);
ret
}
```

# Drawbacks

This is purely an optimization, no drawbacks.
The reason why still one copy:

+ Rust manages its own allocated memory's lifetime, and we're not able
to disable this
+ C-side uses a ref-count mechanism to manage the lifetime of objects
+ We either need to extend the Rust objects' lifetime or do a copy (see
alternatives section)
# Unresolved questions
Actuallty it's possible to extend Rust objects' lifetime.
This can achieve real zero-copy.
## Zero copy
The basic idea is, extend the data's lifetime by moving it into our context
object which is destroyed automatically when one send is complete.
Then we mark this data (which is stored in a `grpc_slice`) as static on
C-side, so the ref-count system won't do the deallocation.

### Result

A feature provided by gRPC called buffer-hint which requires the data to have
a much longer lifetime has prevented me from accepting this idea.
It's too hard to decide how to manage the lifetime in such case (since the time
that the data is no longer needed on C-side is too hard to estimate, we'll have
to do lots of thread-sync and `if`s).

0 comments on commit b31552c

Please sign in to comment.