Skip to content

Commit

Permalink
Parity runtime moved to parity common for publication in crates.io (#271
Browse files Browse the repository at this point in the history
)

* Replace `tokio_core` with `tokio` (`ring` -> 0.13) (#9657)

* Replace `tokio_core` with `tokio`.

* Remove `tokio-core` and replace with `tokio` in

    - `ethcore/stratum`

    - `secret_store`

    - `util/fetch`

    - `util/reactor`

* Bump hyper to 0.12 in

    - `miner`

    - `util/fake-fetch`

    - `util/fetch`

    - `secret_store`

* Bump `jsonrpc-***` to 0.9 in

    - `parity`

    - `ethcore/stratum`

    - `ipfs`

    - `rpc`

    - `rpc_client`

    - `whisper`

* Bump `ring` to 0.13

* Use a more graceful shutdown process in `secret_store` tests.

* Convert some mutexes to rwlocks in `secret_store`.

* Consolidate Tokio Runtime use, remove `CpuPool`.

* Rename and move the `tokio_reactor` crate (`util/reactor`) to
  `tokio_runtime` (`util/runtime`).

* Rename `EventLoop` to `Runtime`.

    - Rename `EventLoop::spawn` to `Runtime::with_default_thread_count`.

    - Add the `Runtime::with_thread_count` method.

    - Rename `Remote` to `Executor`.

* Remove uses of `CpuPool` and spawn all tasks via the `Runtime` executor
  instead.

* Other changes related to `CpuPool` removal:

    - Remove `Reservations::with_pool`. `::new` now takes an `Executor` as an argument.

    - Remove `SenderReservations::with_pool`. `::new` now takes an `Executor` as an argument.

* Remove secret_store runtimes. (#9888)

* Remove the independent runtimes from `KeyServerHttpListener` and
  `KeyServerCore` and instead require a `parity_runtime::Executor`
  to be passed upon creation of each.

* Remove the `threads` parameter from both `ClusterConfiguration` structs.

* Implement the `future::Executor` trait for `parity_runtime::Executor`.

* Update tests.
  - Update the `loop_until` function to instead use a oneshot to signal
    completion.
  - Modify the `make_key_servers` function to create and return a runtime.

*  misc: bump license header to 2019 (#10135)

* misc: bump license header to 2019

* misc: remove_duplicate_empty_lines.sh

* misc: run license header script

* commit cargo lock

* Upgrade to jsonrpc v14 (#11151)

* Upgrade to jsonrpc v14

Contains paritytech/jsonrpc#495 with good bugfixes to resource usage.

* Bump tokio & futures.

* Bump even further.

* Upgrade tokio to 0.1.22

* Partially revert "Bump tokio & futures."

This reverts commit 100907eb91907aa124d856d52374637256118e86.

* Added README, CHANGELOG and several meta tags in Cargo.toml

* Proper pr in changelog

* Remove categories tag

* Comments and usage fixed

* Declare test usage for methods explicitly

* Crate name in readme modified, complete removed

* Test helpers feature added, functions marked as test only

* Processed by fmt tool

* Illustrative example added

* Sample moved into the separate directory

* use examples directory instead of custom crate

* Wait till scanning completed

* Timeout decreased

* Unused methods removed

Co-authored-by: Nick Sanders <c0gent@users.noreply.github.com>
Co-authored-by: 5chdn <5chdn@users.noreply.github.com>
Co-authored-by: David <dvdplm@gmail.com>
Co-authored-by: Nikolay Volf <nikvolf@gmail.com>
  • Loading branch information
5 people authored Feb 11, 2020
1 parent 7428275 commit fc93b7e
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"parity-path",
"plain_hasher",
"rlp",
"runtime",
"transaction-pool",
"trace-time",
"triehash",
Expand Down
11 changes: 11 additions & 0 deletions runtime/CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Changelog

The format is based on [Keep a Changelog].

[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/

## [Unreleased]

## [0.1.1] - 2019-11-25
### Changed
- Moved to parity common repo, prepared for publishing (https://github.com/paritytech/parity-common/pull/271)
19 changes: 19 additions & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "parity-runtime"
version = "0.1.1"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

description = "Tokio runtime wrapper"
license = "GPL-3.0"
readme = "README.md"
homepage = "https://www.parity.io/"
keywords = ["parity", "runtime", "tokio"]
include = ["Cargo.toml", "src/**/*.rs", "README.md", "CHANGELOG.md"]

[dependencies]
futures = "0.1"
tokio = "0.1.22"

[features]
test-helpers = []
6 changes: 6 additions & 0 deletions runtime/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# parity-runtime

Wrapper over tokio runtime. Provides:
- Customizable runtime with ability to spawn it in different thread models
- Corresponding runtime executor for tasks
- Runtime handle
41 changes: 41 additions & 0 deletions runtime/examples/simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.

// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

//! Simple example, illustating usage of runtime wrapper.
use futures::{Future, Stream};
use parity_runtime::Runtime;
use std::thread::park_timeout;
use std::time::Duration;
use tokio::fs::read_dir;

/// Read current directory in a future, which is executed in the created runtime
fn main() {
let fut = read_dir(".")
.flatten_stream()
.for_each(|dir| {
println!("{:?}", dir.path());
Ok(())
})
.map_err(|err| {
eprintln!("Error: {:?}", err);
()
});
let runtime = Runtime::with_default_thread_count();
runtime.executor().spawn(fut);
let timeout = Duration::from_secs(3);
park_timeout(timeout);
}
198 changes: 198 additions & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.

// Parity Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

//! Tokio Runtime wrapper.
use futures::{future, Future, IntoFuture};
use std::sync::mpsc;
use std::{fmt, thread};
pub use tokio::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime, TaskExecutor};
pub use tokio::timer::Delay;

/// Runtime for futures.
///
/// Runs in a separate thread.
pub struct Runtime {
executor: Executor,
handle: RuntimeHandle,
}

impl Runtime {
fn new(runtime_bldr: &mut TokioRuntimeBuilder) -> Self {
let mut runtime = runtime_bldr.build().expect(
"Building a Tokio runtime will only fail when mio components \
cannot be initialized (catastrophic)",
);
let (stop, stopped) = futures::oneshot();
let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
tx.send(runtime.executor()).expect("Rx is blocking upper thread.");
runtime
.block_on(futures::empty().select(stopped).map(|_| ()).map_err(|_| ()))
.expect("Tokio runtime should not have unhandled errors.");
});
let executor = rx.recv().expect("tx is transfered to a newly spawned thread.");

Runtime {
executor: Executor { inner: Mode::Tokio(executor) },
handle: RuntimeHandle { close: Some(stop), handle: Some(handle) },
}
}

/// Spawns a new tokio runtime with a default thread count on a background
/// thread and returns a `Runtime` which can be used to spawn tasks via
/// its executor.
pub fn with_default_thread_count() -> Self {
let mut runtime_bldr = TokioRuntimeBuilder::new();
Self::new(&mut runtime_bldr)
}

/// Spawns a new tokio runtime with a the specified thread count on a
/// background thread and returns a `Runtime` which can be used to spawn
/// tasks via its executor.
#[cfg(any(test, feature = "test-helpers"))]
pub fn with_thread_count(thread_count: usize) -> Self {
let mut runtime_bldr = TokioRuntimeBuilder::new();
runtime_bldr.core_threads(thread_count);

Self::new(&mut runtime_bldr)
}

/// Returns this runtime raw executor.
#[cfg(any(test, feature = "test-helpers"))]
pub fn raw_executor(&self) -> TaskExecutor {
if let Mode::Tokio(ref executor) = self.executor.inner {
executor.clone()
} else {
panic!("Runtime is not initialized in Tokio mode.")
}
}

/// Returns runtime executor.
pub fn executor(&self) -> Executor {
self.executor.clone()
}
}

#[derive(Clone)]
enum Mode {
Tokio(TaskExecutor),
// Mode used in tests
#[allow(dead_code)]
Sync,
// Mode used in tests
#[allow(dead_code)]
ThreadPerFuture,
}

impl fmt::Debug for Mode {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Mode::*;

match *self {
Tokio(_) => write!(fmt, "tokio"),
Sync => write!(fmt, "synchronous"),
ThreadPerFuture => write!(fmt, "thread per future"),
}
}
}

#[derive(Debug, Clone)]
pub struct Executor {
inner: Mode,
}

impl Executor {
/// Synchronous executor, used for tests.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_sync() -> Self {
Executor { inner: Mode::Sync }
}

/// Spawns a new thread for each future (use only for tests).
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_thread_per_future() -> Self {
Executor { inner: Mode::ThreadPerFuture }
}

/// Spawn a future on this runtime
pub fn spawn<R>(&self, r: R)
where
R: IntoFuture<Item = (), Error = ()> + Send + 'static,
R::Future: Send + 'static,
{
match self.inner {
Mode::Tokio(ref executor) => executor.spawn(r.into_future()),
Mode::Sync => {
let _ = r.into_future().wait();
}
Mode::ThreadPerFuture => {
thread::spawn(move || {
let _ = r.into_future().wait();
});
}
}
}
}

impl<F: Future<Item = (), Error = ()> + Send + 'static> future::Executor<F> for Executor {
fn execute(&self, future: F) -> Result<(), future::ExecuteError<F>> {
match self.inner {
Mode::Tokio(ref executor) => executor.execute(future),
Mode::Sync => {
let _ = future.wait();
Ok(())
}
Mode::ThreadPerFuture => {
thread::spawn(move || {
let _ = future.wait();
});
Ok(())
}
}
}
}

/// A handle to a runtime. Dropping the handle will cause runtime to shutdown.
pub struct RuntimeHandle {
close: Option<futures::sync::oneshot::Sender<()>>,
handle: Option<thread::JoinHandle<()>>,
}

impl From<Runtime> for RuntimeHandle {
fn from(el: Runtime) -> Self {
el.handle
}
}

impl Drop for RuntimeHandle {
fn drop(&mut self) {
self.close.take().map(|v| v.send(()));
}
}

impl RuntimeHandle {
/// Blocks current thread and waits until the runtime is finished.
pub fn wait(mut self) -> thread::Result<()> {
self.handle.take().expect("Handle is taken only in `wait`, `wait` is consuming; qed").join()
}

/// Finishes this runtime.
pub fn close(mut self) {
let _ =
self.close.take().expect("Close is taken only in `close` and `drop`. `close` is consuming; qed").send(());
}
}

0 comments on commit fc93b7e

Please sign in to comment.