diff --git a/Cargo.lock b/Cargo.lock index ff22b921c58a..e73cda88dc11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,7 +79,7 @@ dependencies = [ "lazy_static", "log", "mime", - "percent-encoding 2.1.0", + "percent-encoding", "pin-project", "rand 0.7.3", "regex", @@ -542,7 +542,7 @@ dependencies = [ "async-std", "byte-pool", "futures-core", - "http-types 2.4.0", + "http-types", "httparse", "lazy_static", "log", @@ -622,19 +622,6 @@ dependencies = [ "syn 1.0.38", ] -[[package]] -name = "async-sse" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6de73294dd1287b32b10f4c884186446353048f183071dff38c1481f82c053b3" -dependencies = [ - "async-std", - "http-types 1.3.1", - "log", - "memchr", - "pin-project-lite", -] - [[package]] name = "async-std" version = "1.6.3" @@ -705,6 +692,19 @@ dependencies = [ "syn 1.0.38", ] +[[package]] +name = "async-tungstenite" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5c45a0dd44b7e6533ac4e7acc38ead1a3b39885f5bbb738140d30ea528abc7c" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project", + "tungstenite", +] + [[package]] name = "atomic" version = "0.4.6" @@ -750,7 +750,7 @@ dependencies = [ "futures-core", "log", "mime", - "percent-encoding 2.1.0", + "percent-encoding", "rand 0.7.3", "serde", "serde_json", @@ -1245,6 +1245,7 @@ dependencies = [ "forest_crypto", "forest_encoding", "forest_message", + "futures 0.3.5", "ipld_amt", "ipld_blockstore", "lazy_static", @@ -1491,15 +1492,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" -[[package]] -name = "cookie" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" -dependencies = [ - "time 0.1.43", -] - [[package]] name = "cookie" version = "0.14.2" @@ -1510,7 +1502,7 @@ dependencies = [ "base64 0.12.3", "hkdf 0.9.0", "hmac 0.8.1", - "percent-encoding 2.1.0", + "percent-encoding", "rand 0.7.3", "sha2 0.9.1", "time 0.2.16", @@ -2149,22 +2141,6 @@ version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bd3bdaaf0a72155260a1c098989b60db1cbb22d6a628e64f16237aa4da93cc7" -[[package]] -name = "femme" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af1a24f391a5a94d756db5092c6576aad494b88a71a5a36b20c67b63e0df034" -dependencies = [ - "cfg-if", - "js-sys", - "log", - "serde", - "serde_derive", - "serde_json", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "ff-cl-gen" version = "0.1.3" @@ -3185,7 +3161,7 @@ dependencies = [ "async-native-tls", "async-std", "futures 0.3.5", - "http-types 2.4.0", + "http-types", "isahc", "js-sys", "log", @@ -3194,23 +3170,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "http-types" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49d9f44462c2e59d5d5826e7ba74b121ee2fb0ff091ef849692694f1d77aaf50" -dependencies = [ - "anyhow", - "async-std", - "cookie 0.12.0", - "infer", - "omnom", - "pin-project-lite", - "serde", - "serde_json", - "url", -] - [[package]] name = "http-types" version = "2.4.0" @@ -3219,13 +3178,13 @@ checksum = "bb4daf8dc001485f4a32a7a17c54c67fa8a10340188f30ba87ac0fe1a9451e97" dependencies = [ "anyhow", "async-std", - "cookie 0.14.2", + "cookie", "infer", "pin-project-lite", "rand 0.7.3", "serde", "serde_json", - "serde_qs 0.6.1", + "serde_qs", "serde_urlencoded", "url", ] @@ -3334,6 +3293,15 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6854dd77ddc4f9ba1a448f487e27843583d407648150426a30c2ea3a2c39490a" +[[package]] +name = "input_buffer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" +dependencies = [ + "bytes 0.5.6", +] + [[package]] name = "instant" version = "0.1.6" @@ -4742,15 +4710,6 @@ dependencies = [ "num", ] -[[package]] -name = "omnom" -version = "2.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6b216cee2e0d6e680f73158d15468c80b39e571c11669cd90556f9a644e9fd3" -dependencies = [ - "memchr", -] - [[package]] name = "once_cell" version = "1.4.1" @@ -4854,7 +4813,7 @@ dependencies = [ "byteorder 1.3.4", "data-encoding", "multihash 0.11.2", - "percent-encoding 2.1.0", + "percent-encoding", "serde", "static_assertions", "unsigned-varint 0.4.0", @@ -4996,12 +4955,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" -[[package]] -name = "percent-encoding" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" - [[package]] name = "percent-encoding" version = "2.1.0" @@ -5591,7 +5544,7 @@ dependencies = [ "mime", "mime_guess", "native-tls", - "percent-encoding 2.1.0", + "percent-encoding", "pin-project-lite", "serde", "serde_urlencoded", @@ -5639,24 +5592,21 @@ dependencies = [ "librocksdb-sys", ] -[[package]] -name = "route-recognizer" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea509065eb0b3c446acdd0102f0d46567dc30902dc0be91d6552035d92b0f4f8" - [[package]] name = "rpc" version = "0.1.0" dependencies = [ "actor", + "async-log", "async-std", + "async-tungstenite", "bitfield", "chain", "chain_sync", "clock", "db", "fil_types", + "flo_stream", "forest_address", "forest_bigint", "forest_blocks", @@ -5671,6 +5621,7 @@ dependencies = [ "ipld_blockstore", "jsonrpc-v2", "key_management", + "log", "message_pool", "num-traits 0.2.12", "rand 0.7.3", @@ -5681,7 +5632,6 @@ dependencies = [ "state_tree", "test_utils", "thiserror", - "tide", ] [[package]] @@ -5983,18 +5933,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_qs" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d43eef44996bbe16e99ac720e1577eefa16f7b76b5172165c98ced20ae9903e1" -dependencies = [ - "data-encoding", - "error-chain", - "percent-encoding 1.0.1", - "serde", -] - [[package]] name = "serde_qs" version = "0.6.1" @@ -6003,7 +5941,7 @@ checksum = "c6f3acf84e23ab27c01cb5917551765c01c50b2000089db8fa47fe018a3260cf" dependencies = [ "data-encoding", "error-chain", - "percent-encoding 2.1.0", + "percent-encoding", "serde", ] @@ -6645,7 +6583,7 @@ dependencies = [ "encoding_rs", "futures 0.3.5", "http-client", - "http-types 2.4.0", + "http-types", "log", "mime", "mime_guess", @@ -6820,26 +6758,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "tide" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9620c27936f084db0613d82507e3c6e8be64cb1a26e0c721a2796b5a8c7a515" -dependencies = [ - "async-h1", - "async-sse", - "async-std", - "femme", - "http-types 2.4.0", - "kv-log-macro", - "mime", - "mime_guess", - "route-recognizer", - "serde", - "serde_json", - "serde_qs 0.5.2", -] - [[package]] name = "time" version = "0.1.43" @@ -7131,6 +7049,25 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0308d80d86700c5878b9ef6321f020f29b1bb9d5ff3cab25e75e23f3a492a23" +dependencies = [ + "base64 0.12.3", + "byteorder 1.3.4", + "bytes 0.5.6", + "http", + "httparse", + "input_buffer", + "log", + "rand 0.7.3", + "sha-1 0.9.1", + "url", + "utf-8", +] + [[package]] name = "twofish" version = "0.2.0" @@ -7261,10 +7198,16 @@ checksum = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" dependencies = [ "idna", "matches", - "percent-encoding 2.1.0", + "percent-encoding", "serde", ] +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" + [[package]] name = "utils" version = "0.1.0" diff --git a/blockchain/beacon/Cargo.toml b/blockchain/beacon/Cargo.toml index 927b35a21829..789a245588aa 100644 --- a/blockchain/beacon/Cargo.toml +++ b/blockchain/beacon/Cargo.toml @@ -9,7 +9,7 @@ features = ["json"] [dependencies] ahash = "0.4" -async-std = { version = "1.6.0", features = ["unstable"] } +async-std = { version = "1.6.3", features = ["unstable"] } clock = { path = "../../node/clock" } bls-signatures = "0.6.0" serde = { version = "1.0", features = ["derive"] } @@ -24,7 +24,7 @@ hex = "0.4.2" [dev-dependencies] base64 = "0.12.1" -async-std = { version = "1.6.0", features = ["unstable", "attributes"] } +async-std = { version = "1.6.3", features = ["unstable", "attributes"] } serde_json = "1.0" [build-dependencies] diff --git a/blockchain/blocks/src/tipset.rs b/blockchain/blocks/src/tipset.rs index 28b49cd6dc8c..6fb99480d100 100644 --- a/blockchain/blocks/src/tipset.rs +++ b/blockchain/blocks/src/tipset.rs @@ -271,7 +271,7 @@ pub mod tipset_json { use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; /// Wrapper for serializing and deserializing a SignedMessage from JSON. - #[derive(Deserialize, Serialize, Debug)] + #[derive(Deserialize, Serialize)] #[serde(transparent)] pub struct TipsetJson(#[serde(with = "self")] pub Tipset); @@ -292,6 +292,12 @@ pub mod tipset_json { } } + impl<'a> From<&'a Tipset> for TipsetJsonRef<'a> { + fn from(wrapper: &'a Tipset) -> Self { + TipsetJsonRef(wrapper) + } + } + pub fn serialize(m: &Tipset, serializer: S) -> Result where S: Serializer, @@ -299,10 +305,10 @@ pub mod tipset_json { #[derive(Serialize)] #[serde(rename_all = "PascalCase")] struct TipsetSer<'a> { - #[serde(with = "super::super::header::json::vec")] - blocks: &'a [BlockHeader], #[serde(with = "super::tipset_keys_json")] cids: &'a TipsetKeys, + #[serde(with = "super::super::header::json::vec")] + blocks: &'a [BlockHeader], height: ChainEpoch, } TipsetSer { @@ -320,10 +326,10 @@ pub mod tipset_json { #[derive(Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] struct TipsetDe { - #[serde(with = "super::super::header::json::vec")] - blocks: Vec, #[serde(with = "super::tipset_keys_json")] cids: TipsetKeys, + #[serde(with = "super::super::header::json::vec")] + blocks: Vec, height: ChainEpoch, } let TipsetDe { blocks, .. } = Deserialize::deserialize(deserializer)?; diff --git a/blockchain/chain/Cargo.toml b/blockchain/chain/Cargo.toml index 04f36609a3a9..816f78073150 100644 --- a/blockchain/chain/Cargo.toml +++ b/blockchain/chain/Cargo.toml @@ -26,9 +26,14 @@ byteorder = "1.3.4" beacon = { path = "../beacon" } flo_stream = "0.4.0" address = { package = "forest_address", path = "../../vm/address" } -lazy_static = "1.4" +futures = "0.3.5" async-std = "1.6.3" types = { package = "fil_types", path = "../../types" } +lazy_static = "1.4" + + +[features] +json = [] [dev-dependencies] multihash = "0.10.0" diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 1abe3f60afcd..b0c3b8efe56a 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -5,6 +5,7 @@ use super::{Error, TipIndex, TipsetMetadata}; use actor::{power::State as PowerState, STORAGE_POWER_ACTOR_ADDR}; use address::Address; use async_std::sync::RwLock; +use async_std::task; use beacon::{BeaconEntry, IGNORE_DRAND_VAR}; use blake2b_simd::Params; use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta}; @@ -15,9 +16,10 @@ use clock::ChainEpoch; use crypto::DomainSeparationTag; use encoding::{blake2b_256, de::DeserializeOwned, from_slice, Cbor}; use flo_stream::{MessagePublisher, Publisher, Subscriber}; +use futures::{future, StreamExt}; use ipld_amt::Amt; use ipld_blockstore::BlockStore; -use log::{debug, info, warn}; +use log::{info, warn}; use message::{ChainMessage, Message, MessageReceipt, SignedMessage, UnsignedMessage}; use num_bigint::BigInt; use num_traits::Zero; @@ -50,6 +52,31 @@ pub enum HeadChange { Revert(Arc), } +#[derive(Debug, Clone)] +pub struct IndexToHeadChange(pub usize, pub HeadChange); + +#[derive(Clone)] +pub enum EventsPayload { + TaskCancel(usize, ()), + SubHeadChanges(IndexToHeadChange), +} + +impl EventsPayload { + pub fn sub_head_changes(&self) -> Option<&IndexToHeadChange> { + match self { + EventsPayload::SubHeadChanges(s) => Some(s), + _ => None, + } + } + + pub fn task_cancel(&self) -> Option<(usize, ())> { + match self { + EventsPayload::TaskCancel(val, _) => Some((*val, ())), + _ => None, + } + } +} + /// Stores chain data such as heaviest tipset and cached tipset info at each epoch. /// This structure is threadsafe, and all caches are wrapped in a mutex to allow a consistent /// `ChainStore` to be shared across tasks. @@ -63,7 +90,7 @@ pub struct ChainStore { heaviest: RwLock>>, // tip_index tracks tipsets by epoch/parentset for use by expected consensus. - tip_index: TipIndex, + tip_index: RwLock, } impl ChainStore @@ -77,9 +104,9 @@ where .map(Arc::new); Self { db, - publisher: Publisher::new(SINK_CAP).into(), - tip_index: TipIndex::new(), - heaviest: heaviest.into(), + publisher: RwLock::new(Publisher::new(SINK_CAP)), + tip_index: RwLock::new(TipIndex::new()), + heaviest: RwLock::new(heaviest), } } @@ -110,7 +137,7 @@ where tipset_receipts_root: header.message_receipts().clone(), tipset: ts, }; - self.tip_index.put(&meta).await; + self.tip_index.write().await.put(&meta).await; Ok(()) } @@ -160,6 +187,13 @@ where self.heaviest.read().await.clone() } + pub fn tip_index(&self) -> &RwLock { + &self.tip_index + } + + pub fn publisher(&self) -> &RwLock> { + &self.publisher + } /// Returns key-value store instance pub fn blockstore(&self) -> &DB { &self.db @@ -172,28 +206,7 @@ where /// Constructs and returns a full tipset if messages from storage exists pub fn fill_tipsets(&self, ts: Tipset) -> Result { - let mut blocks: Vec = Vec::with_capacity(ts.blocks().len()); - - for header in ts.into_blocks() { - let (bls_messages, secp_messages) = block_messages(self.blockstore(), &header)?; - debug!( - "Fill Tipsets for header {:?} with bls_messages: {:?}", - header.cid(), - bls_messages - .iter() - .map(|b| b.cid().unwrap()) - .collect::>() - ); - - blocks.push(Block { - header, - bls_messages, - secp_messages, - }); - } - - // the given tipset has already been verified, so this cannot fail - Ok(FullTipset::new(blocks).unwrap()) + fill_tipsets(self.blockstore(), ts) } /// Determines if provided tipset is heavier than existing known heaviest tipset @@ -291,6 +304,26 @@ where Ok((bls_msgs, secp_msgs)) } +/// Constructs and returns a full tipset if messages from storage exists - non self version +pub fn fill_tipsets(db: &DB, ts: Tipset) -> Result +where + DB: BlockStore, +{ + let mut blocks: Vec = Vec::with_capacity(ts.blocks().len()); + + for header in ts.into_blocks() { + let (bls_messages, secp_messages) = block_messages(db, &header)?; + blocks.push(Block { + header, + bls_messages, + secp_messages, + }); + } + + // the given tipset has already been verified, so this cannot fail + Ok(FullTipset::new(blocks).unwrap()) +} + /// Returns a tuple of UnsignedMessage and SignedMessages from their Cid pub fn block_messages_from_cids( db: &DB, @@ -705,6 +738,77 @@ where Ok(out) } +pub async fn sub_head_changes( + mut subscribed_head_change: Subscriber, + heaviest_tipset: &Option>, + current_index: usize, + events_pubsub: Arc>>, +) -> Result { + let head = heaviest_tipset + .as_ref() + .ok_or_else(|| Error::Other("Could not get heaviest tipset".to_string()))?; + + (*events_pubsub.write().await) + .publish(EventsPayload::SubHeadChanges(IndexToHeadChange( + current_index, + HeadChange::Current(head.clone()), + ))) + .await; + let subhead_sender = events_pubsub.clone(); + let handle = task::spawn(async move { + while let Some(change) = subscribed_head_change.next().await { + let index_to_head_change = IndexToHeadChange(current_index, change); + subhead_sender + .write() + .await + .publish(EventsPayload::SubHeadChanges(index_to_head_change)) + .await; + } + }); + let cancel_sender = events_pubsub.write().await.subscribe(); + task::spawn(async move { + if let Some(EventsPayload::TaskCancel(_, ())) = cancel_sender + .filter(|s| { + future::ready( + s.task_cancel() + .map(|s| s.0 == current_index) + .unwrap_or_default(), + ) + }) + .next() + .await + { + handle.cancel().await; + } + }); + Ok(current_index) +} + +#[cfg(feature = "json")] +pub mod headchange_json { + use super::*; + use blocks::tipset_json::TipsetJsonRef; + use serde::Serialize; + + #[derive(Serialize)] + #[serde(rename_all = "lowercase")] + #[serde(tag = "type", content = "val")] + pub enum HeadChangeJson<'a> { + Current(TipsetJsonRef<'a>), + Apply(TipsetJsonRef<'a>), + Revert(TipsetJsonRef<'a>), + } + + impl<'a> From<&'a HeadChange> for HeadChangeJson<'a> { + fn from(wrapper: &'a HeadChange) -> Self { + match wrapper { + HeadChange::Current(tipset) => HeadChangeJson::Current(TipsetJsonRef(&tipset)), + HeadChange::Apply(tipset) => HeadChangeJson::Apply(TipsetJsonRef(&tipset)), + HeadChange::Revert(tipset) => HeadChangeJson::Revert(TipsetJsonRef(&tipset)), + } + } + } +} #[cfg(test)] mod tests { use super::*; diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index 0fea5f7a7403..50621aa1aaa6 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -23,7 +23,7 @@ state_manager = { path = "../state_manager/" } num-bigint = { path = "../../utils/bigint", package = "forest_bigint" } crypto = { package = "forest_crypto", path = "../../crypto" } log = "0.4.8" -async-std = { version = "1.6.0", features = ["unstable"] } +async-std = { version = "1.6.3", features = ["unstable"] } forest_libp2p = { path = "../../node/forest_libp2p" } futures = "0.3.5" futures-util = "0.3.5" diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 5ffc642965a2..6c6fcd2b201c 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -102,10 +102,10 @@ where state: ChainSyncState::Bootstrap, worker_state: Default::default(), beacon, - state_manager, - chain_store, network, genesis, + state_manager, + chain_store, bad_blocks: Arc::new(BadBlockCache::default()), net_handler: network_rx, sync_queue: SyncBucketSet::default(), @@ -356,8 +356,8 @@ where return Err(Error::InvalidRoots); } - self.chain_store.put_messages(block.bls_msgs())?; - self.chain_store.put_messages(block.secp_msgs())?; + chain::persist_objects(self.chain_store.blockstore(), block.bls_msgs())?; + chain::persist_objects(self.state_manager.blockstore(), block.secp_msgs())?; Ok(()) } diff --git a/blockchain/message_pool/Cargo.toml b/blockchain/message_pool/Cargo.toml index 4980dbdd9964..0845fd9e740b 100644 --- a/blockchain/message_pool/Cargo.toml +++ b/blockchain/message_pool/Cargo.toml @@ -25,7 +25,8 @@ futures = "0.3.5" libsecp256k1 = "0.3.4" blake2b_simd = "0.5.10" log = "0.4.8" -async-std = "1.6.0" +async-std = "1.6.3" +key_management = { path = "../../key_management"} async-trait = "0.1" state_manager = { path = "../state_manager" } diff --git a/blockchain/state_manager/Cargo.toml b/blockchain/state_manager/Cargo.toml index be09626def99..a3e283911d8c 100644 --- a/blockchain/state_manager/Cargo.toml +++ b/blockchain/state_manager/Cargo.toml @@ -19,7 +19,7 @@ interpreter = { path = "../../vm/interpreter/" } ipld_amt = { path = "../../ipld/amt/" } clock = { path = "../../node/clock" } chain = { path = "../chain" } -async-std = "1.5.0" +async-std = "1.6.3" async-log = "2.0.0" log = "0.4.8" fil_types = { path = "../../types" } diff --git a/blockchain/state_manager/src/lib.rs b/blockchain/state_manager/src/lib.rs index 66d8d6231f18..592cd7e48d0e 100644 --- a/blockchain/state_manager/src/lib.rs +++ b/blockchain/state_manager/src/lib.rs @@ -24,6 +24,7 @@ use interpreter::{ }; use ipld_amt::Amt; use log::{trace, warn}; +use message::{message_receipt, unsigned_message}; use message::{ChainMessage, Message, MessageReceipt, UnsignedMessage}; use num_bigint::{bigint_ser, BigInt}; use serde::{Deserialize, Serialize}; @@ -39,7 +40,9 @@ pub type CidPair = (Cid, Cid); #[derive(Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] pub struct InvocResult { + #[serde(with = "unsigned_message::json")] pub msg: UnsignedMessage, + #[serde(with = "message_receipt::json::opt")] pub msg_rct: Option, pub error: Option, } diff --git a/forest/Cargo.toml b/forest/Cargo.toml index 229f759eb200..354166ce29dc 100644 --- a/forest/Cargo.toml +++ b/forest/Cargo.toml @@ -13,7 +13,7 @@ libp2p = "0.24" futures = "0.3.5" log = "0.4.8" async-log = "2.0.0" -async-std = { version = "1.6.0", features = ["attributes"] } +async-std = { version = "1.6.3", features = ["attributes"] } serde = { version = "1.0", features = ["derive"] } pretty_env_logger = "0.4.0" ctrlc = "3.1.4" @@ -21,6 +21,7 @@ chain_sync = { path = "../blockchain/chain_sync" } state_manager = { path = "../blockchain/state_manager" } cid = { package = "forest_cid", path = "../ipld/cid", features = ["json"] } forest_car = { path = "../ipld/car" } +flo_stream = "0.4.0" num-bigint = { path = "../utils/bigint", package = "forest_bigint" } blocks = { package = "forest_blocks", path = "../blockchain/blocks" } ipld_blockstore = { path = "../ipld/blockstore", features = ["rocksdb"] } diff --git a/forest/src/cli/genesis.rs b/forest/src/cli/genesis.rs index 09f7a13e08c4..01f3f8d76ff5 100644 --- a/forest/src/cli/genesis.rs +++ b/forest/src/cli/genesis.rs @@ -47,7 +47,6 @@ where "Genesis not initialized properly, failed to retrieve network name. \ Requires either a previously initialized genesis or with genesis config option set", ); - Ok((Tipset::new(vec![genesis])?, network_name)) } diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index 37f4a3d98178..e89e7b86c01f 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -9,6 +9,7 @@ use beacon::{DrandBeacon, DEFAULT_DRAND_URL}; use chain::ChainStore; use chain_sync::ChainSyncer; use db::RocksDb; +use flo_stream::{MessagePublisher, Publisher}; use forest_libp2p::{get_keypair, Libp2pService}; use libp2p::identity::{ed25519, Keypair}; use log::{debug, info, trace}; @@ -68,7 +69,8 @@ pub(super) async fn start(config: Config) { let state_manager = Arc::new(StateManager::new(Arc::clone(&db))); // Initialize mpool - let subscriber = chain_store.subscribe().await; + let publisher = chain_store.publisher(); + let subscriber = publisher.write().await.subscribe(); let provider = MpoolRpcProvider::new(subscriber, Arc::clone(&state_manager)); let mpool = Arc::new( MessagePool::new(provider, network_name.clone()) @@ -89,8 +91,9 @@ pub(super) async fn start(config: Config) { .unwrap(); // Initialize ChainSyncer + let chain_store_arc = Arc::new(chain_store); let chain_syncer = ChainSyncer::new( - Arc::new(chain_store), + chain_store_arc.clone(), Arc::clone(&state_manager), Arc::new(beacon), network_send.clone(), @@ -108,7 +111,6 @@ pub(super) async fn start(config: Config) { let p2p_task = task::spawn(async { p2p_service.run().await; }); - let rpc_task = if config.enable_rpc { let keystore_rpc = Arc::clone(&keystore); let rpc_listen = format!("127.0.0.1:{}", &config.rpc_port); @@ -123,6 +125,8 @@ pub(super) async fn start(config: Config) { sync_state, network_send, network_name, + chain_store: chain_store_arc, + events_pubsub: Arc::new(RwLock::new(Publisher::new(1000))), }, &rpc_listen, ) diff --git a/ipld/Cargo.toml b/ipld/Cargo.toml index 18820717a29d..232e134e28ca 100644 --- a/ipld/Cargo.toml +++ b/ipld/Cargo.toml @@ -28,6 +28,6 @@ submodule_tests = ["json"] [dev-dependencies] serde_json = "1.0" -async-std = { version = "1.6.0", features = ["attributes"] } +async-std = { version = "1.6.3", features = ["attributes"] } ipld_blockstore = { path = "blockstore" } db = { path = "../node/db" } diff --git a/ipld/graphsync/Cargo.toml b/ipld/graphsync/Cargo.toml index 421814b68b0e..ec19de71aa28 100644 --- a/ipld/graphsync/Cargo.toml +++ b/ipld/graphsync/Cargo.toml @@ -27,5 +27,5 @@ protoc-rust = "2.14.0" [dev-dependencies] multihash = "0.10" -async-std = "1.5" +async-std = "1.6.3" rand = "0.7" \ No newline at end of file diff --git a/node/forest_libp2p/Cargo.toml b/node/forest_libp2p/Cargo.toml index d6de9b6012aa..eff517bca524 100644 --- a/node/forest_libp2p/Cargo.toml +++ b/node/forest_libp2p/Cargo.toml @@ -12,7 +12,7 @@ futures = "0.3.5" futures-util = "0.3.5" futures_codec = "0.4.0" log = "0.4.8" -async-std = { version = "1.6.0", features = ["unstable"] } +async-std = { version = "1.6.3", features = ["unstable"] } serde = { version = "1.0", features = ["derive"] } forest_blocks = { path = "../../blockchain/blocks" } forest_message = { path = "../../vm/message" } diff --git a/node/rpc/Cargo.toml b/node/rpc/Cargo.toml index 57dc989f952b..b30098eff137 100644 --- a/node/rpc/Cargo.toml +++ b/node/rpc/Cargo.toml @@ -6,18 +6,17 @@ edition = "2018" [dependencies] actor = { path = "../../vm/actor/" } -async-std = { version = "1.6.0", features = ["attributes"] } -tide = "0.9.0" +async-std = { version = "1.6.3", features = ["attributes"] } serde = { version = "1.0.101", default-features = false, features = ["derive"] } serde_json = "1.0.48" -chain = { path = "../../blockchain/chain" } +chain = { path = "../../blockchain/chain" , features = ["json"]} chain_sync = { path = "../../blockchain/chain_sync" } blockstore = { package = "ipld_blockstore", path = "../../ipld/blockstore" } cid = { package = "forest_cid", path = "../../ipld/cid", features = ["json"] } blocks = { package = "forest_blocks", path = "../../blockchain/blocks", features = ["json"] } clock = { path = "../clock" } message = { package = "forest_message", path = "../../vm/message", features = ["json"] } -jsonrpc-v2 = { version = "0.5.2", features = ["easy-errors", "macros"] } +jsonrpc-v2 = { version = "0.5.2", git="https://github.com/ChainSafe/jsonrpc-v2" ,features = ["easy-errors", "macros"] } message_pool = { path = "../../blockchain/message_pool" } crypto = { package = "forest_crypto", path = "../../crypto", features = ["json"] } num-traits = "0.2.11" @@ -34,6 +33,11 @@ rand = "0.7" interpreter = { path = "../../vm/interpreter/" } fil_types = { path = "../../types" } bitfield = { path = "../../utils/bitfield",features = ["json"] } +futures = "0.3.5" +async-tungstenite = "0.8.0" +async-log = "2.0.0" +log ="0.4.8" +flo_stream = "0.4.0" [dev-dependencies] db = { path = "../db" } diff --git a/node/rpc/src/chain_api.rs b/node/rpc/src/chain_api.rs index 31ba8a49f15d..6ccac9183a0f 100644 --- a/node/rpc/src/chain_api.rs +++ b/node/rpc/src/chain_api.rs @@ -56,6 +56,25 @@ where Ok(UnsignedMessageJson(ret)) } +pub(crate) async fn chain_notify<'a, DB, KS>( + data: Data>, + Params(params): Params, +) -> Result +where + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, +{ + let data_subscribe = data.chain_store.subscribe().await; + let index = chain::sub_head_changes( + data_subscribe, + &data.chain_store.heaviest_tipset().await, + params, + data.events_pubsub.clone(), + ) + .await?; + Ok(index) +} + pub(crate) async fn chain_read_obj( data: Data>, Params(params): Params<(CidJson,)>, diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 1ce85b2d44fa..9462d89f4615 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -9,17 +9,40 @@ mod sync_api; mod wallet_api; use crate::state_api::*; -use async_std::sync::{RwLock, Sender}; +use async_log::span; +use async_std::net::{TcpListener, TcpStream}; +use async_std::sync::{Arc, RwLock, Sender}; +use async_std::task::{self, JoinHandle}; +use async_tungstenite::{tungstenite::Message, WebSocketStream}; use blockstore::BlockStore; +use chain::{headchange_json::HeadChangeJson, ChainStore, EventsPayload}; use chain_sync::{BadBlockCache, SyncState}; +use flo_stream::{MessagePublisher, Publisher, Subscriber}; use forest_libp2p::NetworkMessage; -use jsonrpc_v2::{Data, MapRouter, RequestObject, Server}; +use futures::future; +use futures::sink::SinkExt; +use futures::stream::{SplitSink, StreamExt}; +use futures::TryFutureExt; +use jsonrpc_v2::{ + Data, Error, Id, MapRouter, RequestBuilder, RequestObject, ResponseObject, ResponseObjects, + Server, V2, +}; +use log::{debug, error, info, warn}; use message_pool::{MessagePool, MpoolRpcProvider}; +use serde::Serialize; use state_manager::StateManager; -use std::sync::Arc; -use tide::{Request, Response, StatusCode}; use wallet::KeyStore; +type WsSink = SplitSink, async_tungstenite::tungstenite::Message>; + +const CHAIN_NOTIFY_METHOD_NAME: &str = "Filecoin.ChainNotify"; +#[derive(Serialize)] +struct StreamingData<'a> { + json_rpc: &'a str, + method: &'a str, + params: (usize, Vec>), +} + /// This is where you store persistant data, or at least access to stateful data. pub struct RpcState where @@ -28,17 +51,13 @@ where { pub state_manager: Arc>, pub keystore: Arc>, + pub events_pubsub: Arc>>, pub mpool: Arc>>, pub bad_blocks: Arc, pub sync_state: Arc>>>>, pub network_send: Sender, pub network_name: String, -} - -async fn handle_json_rpc(mut req: Request>) -> tide::Result { - let call: RequestObject = req.body_json().await?; - let res = req.state().handle(call).await; - Ok(Response::new(StatusCode::Ok).body_json(&res)?) + pub chain_store: Arc>, } pub async fn start_rpc(state: RpcState, rpc_endpoint: &str) @@ -51,114 +70,416 @@ where use mpool_api::*; use sync_api::*; use wallet_api::*; - + let events_pubsub = state.events_pubsub.clone(); let rpc = Server::new() .with_data(Data::new(state)) .with_method( "Filecoin.ChainGetMessage", chain_api::chain_get_message::, + false, ) - .with_method("Filecoin.ChainGetObj", chain_read_obj::) - .with_method("Filecoin.ChainHasObj", chain_has_obj::) + .with_method("Filecoin.ChainGetObj", chain_read_obj::, false) + .with_method("Filecoin.ChainHasObj", chain_has_obj::, false) .with_method( "Filecoin.ChainGetBlockMessages", chain_block_messages::, + false, ) .with_method( "Filecoin.ChainGetTipsetByHeight", chain_get_tipset_by_height::, + false, + ) + .with_method( + "Filecoin.ChainGetGenesis", + chain_get_genesis::, + false, + ) + .with_method( + "Filecoin.ChainTipsetWeight", + chain_tipset_weight::, + false, + ) + .with_method("Filecoin.ChainGetTipset", chain_get_tipset::, false) + .with_method( + "Filecoin.GetRandomness", + chain_get_randomness::, + false, ) - .with_method("Filecoin.ChainGetGenesis", chain_get_genesis::) - .with_method("Filecoin.ChainTipsetWeight", chain_tipset_weight::) - .with_method("Filecoin.ChainGetTipset", chain_get_tipset::) - .with_method("Filecoin.GetRandomness", chain_get_randomness::) .with_method( "Filecoin.ChainGetBlock", chain_api::chain_get_block::, + false, ) - .with_method("Filecoin.ChainHead", chain_head::) + .with_method(CHAIN_NOTIFY_METHOD_NAME, chain_notify::, true) + .with_method("Filecoin.ChainHead", chain_head::, false) // Message Pool API .with_method( "Filecoin.MpoolEstimateGasPrice", estimate_gas_premium::, + false, + ) + .with_method( + "Filecoin.MpoolGetNonce", + mpool_get_sequence::, + false, + ) + .with_method("Filecoin.MpoolPending", mpool_pending::, false) + .with_method("Filecoin.MpoolPush", mpool_push::, false) + .with_method( + "Filecoin.MpoolPushMessage", + mpool_push_message::, + false, ) - .with_method("Filecoin.MpoolGetNonce", mpool_get_sequence::) - .with_method("Filecoin.MpoolPending", mpool_pending::) - .with_method("Filecoin.MpoolPush", mpool_push::) - .with_method("Filecoin.MpoolPushMessage", mpool_push_message::) // Sync API - .with_method("Filecoin.SyncCheckBad", sync_check_bad::) - .with_method("Filecoin.SyncMarkBad", sync_mark_bad::) - .with_method("Filecoin.SyncState", sync_state::) - .with_method("Filecoin.SyncSubmitBlock", sync_submit_block::) + .with_method("Filecoin.SyncCheckBad", sync_check_bad::, false) + .with_method("Filecoin.SyncMarkBad", sync_mark_bad::, false) + .with_method("Filecoin.SyncState", sync_state::, false) + .with_method( + "Filecoin.SyncSubmitBlock", + sync_submit_block::, + false, + ) // Wallet API - .with_method("Filecoin.WalletBalance", wallet_balance::) + .with_method("Filecoin.WalletBalance", wallet_balance::, false) .with_method( "Filecoin.WalletDefaultAddress", wallet_default_address::, + false, ) - .with_method("Filecoin.WalletExport", wallet_export::) - .with_method("Filecoin.WalletHas", wallet_has::) - .with_method("Filecoin.WalletImport", wallet_import::) - .with_method("Filecoin.WalletList", wallet_list::) - .with_method("Filecoin.WalletNew", wallet_new::) - .with_method("Filecoin.WalletSetDefault", wallet_set_default::) - .with_method("Filecoin.WalletSign", wallet_sign::) - .with_method("Filecoin.WalletSignMessage", wallet_sign_message::) - .with_method("Filecoin.WalletVerify", wallet_verify::) + .with_method("Filecoin.WalletExport", wallet_export::, false) + .with_method("Filecoin.WalletHas", wallet_has::, false) + .with_method("Filecoin.WalletImport", wallet_import::, false) + .with_method("Filecoin.WalletList", wallet_list::, false) + .with_method("Filecoin.WalletNew", wallet_new::, false) + .with_method( + "Filecoin.WalletSetDefault", + wallet_set_default::, + false, + ) + .with_method("Filecoin.WalletSign", wallet_sign::, false) + .with_method( + "Filecoin.WalletSignMessage", + wallet_sign_message::, + false, + ) + .with_method("Filecoin.WalletVerify", wallet_verify::, false) // State API - .with_method("Filecoin.StateMinerSector", state_miner_sector::) - .with_method("Filecoin.StateCall", state_call::) + .with_method( + "Filecoin.StateMinerSector", + state_miner_sector::, + false, + ) + .with_method("Filecoin.StateCall", state_call::, false) .with_method( "Filecoin.StateMinerDeadlines", state_miner_deadlines::, + false, ) .with_method( "Filecoin.StateSectorPrecommitInfo", state_sector_precommit_info::, + false, + ) + .with_method( + "Filecoin.StateSectorInfo", + state_sector_info::, + false, ) - .with_method("Filecoin.StateSectorInfo", state_sector_info::) .with_method( "Filecoin.StateMinerProvingSet", state_miner_proving_set::, + false, ) .with_method( "Filecoin.StateMinerProvingDeadline", state_miner_proving_deadline::, + false, + ) + .with_method("Filecoin.StateMinerInfo", state_miner_info::, false) + .with_method( + "Filecoin.StateMinerFaults", + state_miner_faults::, + false, ) - .with_method("Filecoin.StateMinerInfo", state_miner_info::) - .with_method("Filecoin.StateMinerFaults", state_miner_faults::) .with_method( "Filecoin.StateAllMinerFaults", state_all_miner_faults::, + false, ) .with_method( "Filecoin.StateMinerRecoveries", state_miner_recoveries::, + false, + ) + .with_method("Filecoin.StateReplay", state_replay::, false) + .with_method("Filecoin.StateGetActor", state_get_actor::, false) + .with_method( + "Filecoin.StateAccountKey", + state_account_key::, + false, ) - .with_method("Filecoin.StateReplay", state_replay::) - .with_method("Filecoin.StateGetActor", state_get_actor::) - .with_method("Filecoin.StateAccountKey", state_account_key::) - .with_method("Filecoin.StateLookupId", state_lookup_id::) + .with_method("Filecoin.StateLookupId", state_lookup_id::, false) .with_method( "Filecoin.StateMartketBalance", state_market_balance::, + false, + ) + .with_method( + "Filecoin.StateGetReceipt", + state_get_receipt::, + false, ) - .with_method("Filecoin.StateGetReceipt", state_get_receipt::) - .with_method("Filecoin.StateWaitMsg", state_wait_msg::) + .with_method("Filecoin.StateWaitMsg", state_wait_msg::, false) // Gas API .with_method( "Filecoin.GasEstimateGasLimit", gas_estimate_gas_limit::, + false, ) .with_method( "Filecoin.GasEstimateGasPremium", gas_estimate_gas_premium::, + false, + ) + .with_method( + "Filecoin.GasEstimateFeeCap", + gas_estimate_fee_cap::, + false, ) - .with_method("Filecoin.GasEstimateFeeCap", gas_estimate_fee_cap::) .finish_unwrapped(); - let mut app = tide::Server::with_state(rpc); - app.at("/rpc/v0").post(handle_json_rpc); - app.listen(rpc_endpoint).await.unwrap(); + let try_socket = TcpListener::bind(rpc_endpoint).await; + let listener = try_socket.expect("Failed to bind to addr"); + let rpc_state = Arc::new(rpc); + + info!("waiting for web socket connections"); + while let Ok((stream, addr)) = listener.accept().await { + let subscriber = events_pubsub.write().await.subscribe(); + task::spawn(handle_connection_and_log( + rpc_state.clone(), + stream, + addr, + events_pubsub.clone(), + subscriber, + )); + } + + info!("Stopped accepting websocket connections"); +} + +async fn handle_connection_and_log( + state: Arc>, + tcp_stream: TcpStream, + addr: std::net::SocketAddr, + events_out: Arc>>, + events_in: Subscriber, +) { + span!("handle_connection_and_log", { + if let Ok(ws_stream) = async_tungstenite::accept_async(tcp_stream).await { + debug!("accepted websocket connection at {:}", addr); + let (ws_sender, mut ws_receiver) = ws_stream.split(); + let ws_sender = Arc::new(RwLock::new(ws_sender)); + let mut chain_notify_count: usize = 0; + while let Some(message_result) = ws_receiver.next().await { + match message_result { + Ok(message) => { + let request_text = message.into_text().unwrap(); + match serde_json::from_str(&request_text) + as Result + { + Ok(call) => { + // hacky but due to the limitations of jsonrpc_v2 impl + // if this expands, better to implement some sort of middleware + let call = if &*call.method == CHAIN_NOTIFY_METHOD_NAME { + chain_notify_count += 1; + RequestBuilder::default() + .with_id(call.id.unwrap_or_default().unwrap_or_default()) + .with_params(chain_notify_count) + .with_method(CHAIN_NOTIFY_METHOD_NAME) + .finish() + } else { + call + }; + let response = state.clone().handle(call).await; + let error_send = ws_sender.clone(); + + // initiate response and streaming if applicable + let join_handle = streaming_payload( + ws_sender.clone(), + response, + chain_notify_count, + events_out.clone(), + events_in.clone(), + ) + .map_err(|e| async move { + send_error( + 3, + &error_send, + format!( + "channel id {:}, error {:?}", + chain_notify_count, + e.message() + ), + ) + .await + .unwrap_or_else(|e| { + error!("error {:?} on socket {:?}", e.message(), addr) + }); + }) + .await + .unwrap_or_else(|_| { + error!("error sending on socket {:?}", addr); + None + }); + + // wait for join handle to complete if there is error and send it over the network and cancel streaming + let error_join_send = ws_sender.clone(); + let handle_events_out = events_out.clone(); + task::spawn(async move { + if let Some(handle) = join_handle { + handle + .map_err(|e| async move { + send_error( + 3, + &error_join_send, + format!( + "channel id {:}, error {:?}", + chain_notify_count, + e.message() + ), + ) + .await + .unwrap_or_else(|e| { + error!( + "error {:?} on socket {:?}", + e.message(), + addr + ) + }); + }) + .await + .unwrap_or_else(|_| { + error!("error sending on socket {:?}", addr) + }); + + handle_events_out + .write() + .await + .publish(EventsPayload::TaskCancel( + chain_notify_count, + (), + )) + .await; + } else { + handle_events_out + .write() + .await + .publish(EventsPayload::TaskCancel( + chain_notify_count, + (), + )) + .await + } + }); + } + Err(e) => send_error(1, &ws_sender, e.to_string()) + .await + .unwrap_or_else(|e| { + error!("error {:?} on socket {:?}", e.message(), addr) + }), + } + } + Err(e) => send_error(2, &ws_sender, e.to_string()) + .await + .unwrap_or_else(|e| error!("error {:?} on socket {:?}", e.message(), addr)), + }; + } + } else { + warn!("web socket connection failed at {:}", addr) + } + }) +} + +async fn send_error(code: i64, ws_sender: &RwLock, message: String) -> Result<(), Error> { + let response = ResponseObjects::One(ResponseObject::Error { + jsonrpc: V2, + error: Error::Full { + code, + message, + data: None, + }, + id: Id::Null, + }); + let response_text = serde_json::to_string(&response)?; + ws_sender + .write() + .await + .send(Message::text(response_text)) + .await?; + Ok(()) +} +async fn streaming_payload( + ws_sender: Arc>, + response_object: ResponseObjects, + streaming_count: usize, + events_out: Arc>>, + events_in: Subscriber, +) -> Result>>, Error> { + let response_text = serde_json::to_string(&response_object)?; + ws_sender + .write() + .await + .send(Message::text(response_text)) + .await?; + if let ResponseObjects::One(ResponseObject::Result { + jsonrpc: _, + result: _, + id: _, + streaming, + }) = response_object + { + if streaming { + let handle = task::spawn(async move { + let mut filter_on_channel_id = events_in.filter(|s| { + future::ready( + s.sub_head_changes() + .map(|s| s.0 == streaming_count) + .unwrap_or_default(), + ) + }); + while let Some(event) = filter_on_channel_id.next().await { + if let EventsPayload::SubHeadChanges(ref index_to_head_change) = event { + if streaming_count == index_to_head_change.0 { + let head_change = (&index_to_head_change.1).into(); + let data = StreamingData { + json_rpc: "2.0", + method: "xrpc.ch.val", + params: (streaming_count, vec![head_change]), + }; + let response_text = serde_json::to_string(&data)?; + ws_sender + .write() + .await + .send(Message::text(response_text)) + .await?; + } + } + } + + Ok::<(), Error>(()) + }); + + Ok(Some(handle)) + } else { + Ok(None) + } + } else { + events_out + .write() + .await + .publish(EventsPayload::TaskCancel(streaming_count, ())) + .await; + Ok(None) + } } diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index eefd22349047..84c5f7e28baf 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -6,7 +6,7 @@ use actor::miner::{ compute_proving_period_deadline, ChainSectorInfo, DeadlineInfo, Deadlines, Fault, MinerInfo, SectorOnChainInfo, SectorPreCommitOnChainInfo, State, }; -use address::Address; +use address::{json::AddressJson, Address}; use async_std::task; use bitfield::json::BitFieldJson; use blocks::{tipset_json::TipsetJson, Tipset, TipsetKeys}; @@ -19,7 +19,7 @@ use message::{ message_receipt::json::MessageReceiptJson, unsigned_message::{json::UnsignedMessageJson, UnsignedMessage}, }; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use state_manager::{InvocResult, MarketBalance, StateManager}; use state_tree::StateTree; use wallet::KeyStore; @@ -31,24 +31,6 @@ pub struct MessageLookup { pub tipset: TipsetJson, } -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct InvocResultJson { - pub msg: UnsignedMessageJson, - pub msg_rct: Option, - pub error: Option, -} - -impl From for InvocResultJson { - fn from(invoc: InvocResult) -> Self { - InvocResultJson { - msg: invoc.msg.into(), - msg_rct: invoc.msg_rct.map(|s| s.into()), - error: invoc.error, - } - } -} - /// returns info about the given miner's sectors. If the filter bitfield is nil, all sectors are included. /// If the filterOut boolean is set to true, any sectors in the filter are excluded. /// If false, only those sectors in the filter are included. @@ -259,16 +241,16 @@ pub(crate) async fn state_replay< >( data: Data>, Params(params): Params<(CidJson, TipsetKeys)>, -) -> Result { +) -> Result { let state_manager = &data.state_manager; let (cidjson, key) = params; let cid = cidjson.into(); let tipset = chain::tipset_from_keys(data.state_manager.blockstore(), &key)?; let (msg, ret) = state_manager.replay(&tipset, &cid)?; - Ok(InvocResultJson { - msg: msg.into(), - msg_rct: ret.as_ref().map(|s| s.msg_receipt.clone().into()), + Ok(InvocResult { + msg, + msg_rct: ret.as_ref().map(|s| s.msg_receipt.clone()), error: ret .map(|act| act.act_error.map(|e| e.to_string())) .unwrap_or_default(), @@ -297,13 +279,13 @@ pub(crate) async fn state_account_key< >( data: Data>, Params(params): Params<(Address, TipsetKeys)>, -) -> Result, JsonRpcError> { +) -> Result, JsonRpcError> { let state_manager = &data.state_manager; let (actor, key) = params; let tipset = chain::tipset_from_keys(data.state_manager.blockstore(), &key)?; let state = state_for_ts(&state_manager, Some(tipset))?; let address = interpreter::resolve_to_key_addr(&state, state_manager.blockstore(), &actor)?; - Ok(address.into()) + Ok(Some(address.into())) } /// retrieves the ID address of the given address pub(crate) async fn state_lookup_id< diff --git a/node/rpc/src/sync_api.rs b/node/rpc/src/sync_api.rs index 2738698b9e5b..680d70934ec1 100644 --- a/node/rpc/src/sync_api.rs +++ b/node/rpc/src/sync_api.rs @@ -100,6 +100,7 @@ mod tests { use chain::ChainStore; use chain_sync::SyncStage; use db::{MemoryDB, Store}; + use flo_stream::Publisher; use forest_libp2p::NetworkMessage; use futures::StreamExt; use message_pool::{MessagePool, MpoolRpcProvider}; @@ -117,22 +118,24 @@ mod tests { let (network_send, network_rx) = channel(5); let db = Arc::new(MemoryDB::default()); let state_manager = Arc::new(StateManager::new(db.clone())); - - let pool = task::block_on(async { - let cs = ChainStore::new(db.clone()); + let cs = ChainStore::new(db.clone()); + let cs_arc = Arc::new(cs); + let state_manager_for_thread = state_manager.clone(); + let cs_move = cs_arc.clone(); + let pool = task::block_on(async move { let bz = hex::decode("904300e80781586082cb7477a801f55c1f2ea5e5d1167661feea60a39f697e1099af132682b81cc5047beacf5b6e80d5f52b9fd90323fb8510a5396416dd076c13c85619e176558582744053a3faef6764829aa02132a1571a76aabdc498a638ea0054d3bb57f41d82015860812d2396cc4592cdf7f829374b01ffd03c5469a4b0a9acc5ccc642797aa0a5498b97b28d90820fedc6f79ff0a6005f5c15dbaca3b8a45720af7ed53000555667207a0ccb50073cd24510995abd4c4e45c1e9e114905018b2da9454190499941e818201582012dd0a6a7d0e222a97926da03adb5a7768d31cc7c5c2bd6828e14a7d25fa3a608182004b76616c69642070726f6f6681d82a5827000171a0e4022030f89a8b0373ad69079dbcbc5addfe9b34dce932189786e50d3eb432ede3ba9c43000f0001d82a5827000171a0e4022052238c7d15c100c1b9ebf849541810c9e3c2d86e826512c6c416d2318fcd496dd82a5827000171a0e40220e5658b3d18cd06e1db9015b4b0ec55c123a24d5be1ea24d83938c5b8397b4f2fd82a5827000171a0e4022018d351341c302a21786b585708c9873565a0d07c42521d4aaf52da3ff6f2e461586102c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001a5f2c5439586102b5cd48724dce0fec8799d77fd6c5113276e7f470c8391faa0b5a6033a3eaf357d635705c36abe10309d73592727289680515afd9d424793ba4796b052682d21b03c5c8a37d94827fecc59cdc5750e198fdf20dee012f4d627c6665132298ab95004500053724e0").unwrap(); let header = BlockHeader::unmarshal_cbor(&bz).unwrap(); let ts = Tipset::new(vec![header]).unwrap(); - let subscriber = cs.subscribe().await; - let db = cs.db.clone(); + let subscriber = cs_move.subscribe().await; + let db = cs_move.db.clone(); let tsk = ts.key().cids.clone(); - cs.set_heaviest_tipset(Arc::new(ts)).await.unwrap(); + cs_move.set_heaviest_tipset(Arc::new(ts)).await.unwrap(); for i in tsk { let bz2 = bz.clone(); db.as_ref().write(i.key(), bz2).unwrap(); } - let provider = MpoolRpcProvider::new(subscriber, state_manager.clone()); + let provider = MpoolRpcProvider::new(subscriber, state_manager_for_thread.clone()); MessagePool::new(provider, "test".to_string()) .await .unwrap() @@ -146,6 +149,8 @@ mod tests { sync_state: Arc::new(RwLock::new(vec![Default::default()])), network_send, network_name: TEST_NET_NAME.to_owned(), + events_pubsub: Arc::new(RwLock::new(Publisher::new(1000))), + chain_store: cs_arc, }); (state, network_rx) } diff --git a/utils/test_utils/Cargo.toml b/utils/test_utils/Cargo.toml index 6e4b39fef83d..f73c4dea5488 100644 --- a/utils/test_utils/Cargo.toml +++ b/utils/test_utils/Cargo.toml @@ -16,7 +16,7 @@ chain = { path = "../../blockchain/chain/", optional = true } message = { package = "forest_message", path = "../../vm/message", optional = true } num-bigint = { path = "../../utils/bigint", package = "forest_bigint" } crypto = { package = "forest_crypto", path = "../../crypto" } -async-std = { version = "1.6.0", features = ["unstable"] } +async-std = { version = "1.6.3", features = ["unstable"] } forest_libp2p = { path = "../../node/forest_libp2p/", optional = true } encoding = { package = "forest_encoding", path = "../../encoding/"} base64 = "0.12.1" diff --git a/vm/address/src/lib.rs b/vm/address/src/lib.rs index ef991cd1ef2e..f23f20ef6d55 100644 --- a/vm/address/src/lib.rs +++ b/vm/address/src/lib.rs @@ -322,6 +322,22 @@ pub mod json { use serde::{Deserialize, Deserializer, Serializer}; use std::borrow::Cow; + /// Wrapper for serializing and deserializing a SignedMessage from JSON. + #[derive(Deserialize, Serialize)] + #[serde(transparent)] + pub struct AddressJson(#[serde(with = "self")] pub Address); + + /// Wrapper for serializing a SignedMessage reference to JSON. + #[derive(Serialize)] + #[serde(transparent)] + pub struct AddressJsonRef<'a>(#[serde(with = "self")] pub &'a Address); + + impl From
for AddressJson { + fn from(address: Address) -> Self { + Self(address) + } + } + pub fn serialize(m: &Address, serializer: S) -> Result where S: Serializer, diff --git a/vm/message/src/message_receipt.rs b/vm/message/src/message_receipt.rs index 9f7d9ecda4f8..ff484b23ead2 100644 --- a/vm/message/src/message_receipt.rs +++ b/vm/message/src/message_receipt.rs @@ -108,4 +108,25 @@ pub mod json { deserializer.deserialize_any(GoVecVisitor::::new()) } } + + pub mod opt { + use super::*; + + pub fn serialize(v: &Option, serializer: S) -> Result + where + S: Serializer, + { + v.as_ref() + .map(|s| MessageReceiptJsonRef(s)) + .serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let s: Option = Deserialize::deserialize(deserializer)?; + Ok(s) + } + } }