From c8256f04569b8d05b2981c423f68aed730e5b983 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Thu, 10 Dec 2020 23:29:09 -0500 Subject: [PATCH 1/7] add net addrs listen --- node/forest_libp2p/src/lib.rs | 4 ++++ node/forest_libp2p/src/service.rs | 29 +++++++++++++++++++++++-- node/rpc/src/lib.rs | 7 ++++++ node/rpc/src/net_api.rs | 36 +++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 node/rpc/src/net_api.rs diff --git a/node/forest_libp2p/src/lib.rs b/node/forest_libp2p/src/lib.rs index 93f17cf913c3..4cee3e0749a1 100644 --- a/node/forest_libp2p/src/lib.rs +++ b/node/forest_libp2p/src/lib.rs @@ -17,3 +17,7 @@ pub use self::behaviour::*; pub use self::chain_exchange::{ChainExchangeRequest, MESSAGES}; pub use self::config::*; pub use self::service::*; + +// Re-export some libp2p types +pub use libp2p::core::PeerId; +pub use libp2p::multiaddr::Multiaddr; diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 3f376df79948..3ba2c4644b51 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -33,6 +33,7 @@ use std::io::{Error, ErrorKind}; use std::sync::Arc; use std::time::Duration; use utils::read_file_to_vec; +use libp2p::core::Multiaddr; pub const PUBSUB_BLOCK_STR: &str = "/fil/blocks"; pub const PUBSUB_MSG_STR: &str = "/fil/msgs"; @@ -104,6 +105,19 @@ pub enum NetworkMessage { cid: Cid, response_channel: OneShotSender<()>, }, + JSONRPCRequest { + method: NetRPCMethods, + response_channel: OneShotSender, + } +} +#[derive(Debug)] +pub enum NetRPCMethods { + NetAddrsListen, +} +#[derive(Debug)] +pub enum NetRPCResponse { + /// Your peer id and all the multiaddrs you are listening on + NetAddrsListen(PeerId, Vec) } /// The Libp2pService listens to events from the Libp2p swarm. pub struct Libp2pService { @@ -324,9 +338,20 @@ where warn!("Failed to send a bitswap want_block: {}", e.to_string()); } else if let Some(chans) = self.bitswap_response_channels.get_mut(&cid) { chans.push(response_channel); - } else { - self.bitswap_response_channels.insert(cid, vec![response_channel]); + } else { + self.bitswap_response_channels.insert(cid, vec![response_channel]); + } + } + NetworkMessage::JSONRPCRequest {method, response_channel} => { + match method { + NetRPCMethods::NetAddrsListen => { + let listeners: Vec<_> = Swarm::listeners( swarm_stream.get_mut()).cloned().collect(); + let peer_id = Swarm::local_peer_id(swarm_stream.get_mut()); + if let Err(_) = response_channel.send(NetRPCResponse::NetAddrsListen(peer_id.clone(), listeners)) { + warn!("Failed to get Libp2p listeners"); + } } + } } } None => { break; } diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 8fdebec5994e..3876516e3d88 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -10,6 +10,7 @@ mod mpool_api; mod state_api; mod sync_api; mod wallet_api; +mod net_api; use crate::{beacon_api::beacon_get_entry, common_api::version, state_api::*}; use async_log::span; @@ -307,6 +308,12 @@ where beacon_get_entry::, false, ) + // Net + .with_method( + "Filecoin.NetAddrsListen", + net_api::net_addrs_listen::, + false, + ) .finish_unwrapped(); let try_socket = TcpListener::bind(rpc_endpoint).await; diff --git a/node/rpc/src/net_api.rs b/node/rpc/src/net_api.rs new file mode 100644 index 000000000000..2c8b2d4433ab --- /dev/null +++ b/node/rpc/src/net_api.rs @@ -0,0 +1,36 @@ +use jsonrpc_v2::{Data, Error as JsonRpcError}; +use wallet::KeyStore; +use beacon::Beacon; +use blockstore::BlockStore; +use crate::RpcState; +use serde::Serialize; +use forest_libp2p::{PeerId, Multiaddr, NetworkMessage, NetRPCMethods, NetRPCResponse}; +use futures::channel::oneshot; + + +#[derive(Serialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct AddrInfo { + #[serde(rename = "ID")] + id: String, + addrs: Vec, +} +pub(crate) async fn net_addrs_listen< + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, + B: Beacon + Send + Sync + 'static, +>( + data: Data>, +) -> Result { + let (tx, rx) = oneshot::channel(); + let req = NetworkMessage::JSONRPCRequest { + method: NetRPCMethods::NetAddrsListen, + response_channel: tx + }; + data.network_send.send(req).await; + let resp = match rx.await? { + NetRPCResponse::NetAddrsListen(id, addrs) => AddrInfo{id: id.to_string(), addrs} + + }; + Ok(resp) +} \ No newline at end of file From c61978dac6ccc543d3c12227bbb4148fd07530a8 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Fri, 11 Dec 2020 00:51:51 -0500 Subject: [PATCH 2/7] fix some basic methods to get miner to start running --- node/rpc/Cargo.toml | 2 ++ node/rpc/src/chain_api.rs | 4 ++-- node/rpc/src/lib.rs | 7 ++++++- node/rpc/src/state_api.rs | 20 +++++++++++++++++++- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/node/rpc/Cargo.toml b/node/rpc/Cargo.toml index ea26eb1565a5..68885a363a39 100644 --- a/node/rpc/Cargo.toml +++ b/node/rpc/Cargo.toml @@ -52,3 +52,5 @@ db = { package = "forest_db", path = "../db" } futures = "0.3.5" test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] } hex = "0.4.2" + +bitfield = { package = "forest_bitfield", path = "../../utils/bitfield" } diff --git a/node/rpc/src/chain_api.rs b/node/rpc/src/chain_api.rs index ee8bff2e5398..1174f080d957 100644 --- a/node/rpc/src/chain_api.rs +++ b/node/rpc/src/chain_api.rs @@ -243,14 +243,14 @@ where pub(crate) async fn chain_get_tipset( data: Data>, - Params(params): Params<(TipsetKeys,)>, + Params(params): Params<(TipsetKeysJson,)>, ) -> Result where DB: BlockStore + Send + Sync + 'static, KS: KeyStore + Send + Sync + 'static, B: Beacon + Send + Sync + 'static, { - let (tsk,) = params; + let (TipsetKeysJson(tsk),) = params; let ts = data .state_manager .chain_store() diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index ac17ac4b4134..435a8ff0e33d 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -123,7 +123,7 @@ where false, ) .with_method( - "Filecoin.ChainGetTipset", + "Filecoin.ChainGetTipSet", chain_get_tipset::, false, ) @@ -275,6 +275,11 @@ where false, ) .with_method("Filecoin.StateWaitMsg", state_wait_msg::, false) + .with_method( + "Filecoin.StateMinerSectorAllocated", + state_miner_sector_allocated::, + false + ) .with_method( "Filecoin.StateNetworkName", state_network_name::, diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index 386e33654457..25d8771d3f7d 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -7,7 +7,7 @@ use actor::miner::MinerInfo; use actor::miner::{ ChainSectorInfo, Fault, SectorOnChainInfo, SectorPreCommitOnChainInfo, State, WorkerKeyChange, }; -use actor::{market, DealID, DealWeight, TokenAmount, STORAGE_MARKET_ACTOR_ADDR}; +use actor::{market, DealID, DealWeight, TokenAmount, STORAGE_MARKET_ACTOR_ADDR, miner}; use address::{json::AddressJson, Address}; use async_std::task; use beacon::{json::BeaconEntryJson, Beacon, BeaconEntry}; @@ -769,6 +769,24 @@ pub(crate) async fn state_market_deals< Ok(out) } +pub(crate) async fn state_miner_sector_allocated< + DB: BlockStore + Send + Sync + 'static, + KS: KeyStore + Send + Sync + 'static, + B: Beacon + Send + Sync + 'static, +>( + data: Data>, + Params(params): Params<(AddressJson, u64, TipsetKeysJson)>, +) -> Result { + let (AddressJson(maddr), sector_num, TipsetKeysJson(tsk)) = params; + let ts = data.chain_store.tipset_from_keys(&tsk).await?; + let mas: miner::State = data.state_manager.load_actor_state(&maddr, ts.parent_state())?; + let mut allocated_sectors: bitfield::BitField = data.chain_store.db + .get(&mas.allocated_sectors)? + .ok_or("allocated sectors bitfield not found")?; + + Ok(allocated_sectors.get(sector_num as usize)) +} + /// returns a state tree given a tipset async fn state_for_ts( state_manager: &Arc>, From b0b9298e8f68178f4771e7847ddebaeec437a2e9 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Fri, 11 Dec 2020 00:52:07 -0500 Subject: [PATCH 3/7] fmt --- node/forest_libp2p/src/service.rs | 8 ++++---- node/rpc/src/lib.rs | 4 ++-- node/rpc/src/net_api.rs | 21 +++++++++++---------- node/rpc/src/state_api.rs | 10 +++++++--- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 3ba2c4644b51..677eb649f71b 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -18,6 +18,7 @@ use futures::channel::oneshot::Sender as OneShotSender; use futures::select; use futures_util::stream::StreamExt; use ipld_blockstore::BlockStore; +use libp2p::core::Multiaddr; pub use libp2p::gossipsub::Topic; use libp2p::{ core, @@ -33,7 +34,6 @@ use std::io::{Error, ErrorKind}; use std::sync::Arc; use std::time::Duration; use utils::read_file_to_vec; -use libp2p::core::Multiaddr; pub const PUBSUB_BLOCK_STR: &str = "/fil/blocks"; pub const PUBSUB_MSG_STR: &str = "/fil/msgs"; @@ -108,16 +108,16 @@ pub enum NetworkMessage { JSONRPCRequest { method: NetRPCMethods, response_channel: OneShotSender, - } + }, } #[derive(Debug)] pub enum NetRPCMethods { - NetAddrsListen, + NetAddrsListen, } #[derive(Debug)] pub enum NetRPCResponse { /// Your peer id and all the multiaddrs you are listening on - NetAddrsListen(PeerId, Vec) + NetAddrsListen(PeerId, Vec), } /// The Libp2pService listens to events from the Libp2p swarm. pub struct Libp2pService { diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 435a8ff0e33d..23f7d721d7ee 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -7,10 +7,10 @@ mod chain_api; mod common_api; mod gas_api; mod mpool_api; +mod net_api; mod state_api; mod sync_api; mod wallet_api; -mod net_api; use crate::{beacon_api::beacon_get_entry, common_api::version, state_api::*}; use async_log::span; @@ -278,7 +278,7 @@ where .with_method( "Filecoin.StateMinerSectorAllocated", state_miner_sector_allocated::, - false + false, ) .with_method( "Filecoin.StateNetworkName", diff --git a/node/rpc/src/net_api.rs b/node/rpc/src/net_api.rs index 2c8b2d4433ab..acd18b10f395 100644 --- a/node/rpc/src/net_api.rs +++ b/node/rpc/src/net_api.rs @@ -1,12 +1,11 @@ -use jsonrpc_v2::{Data, Error as JsonRpcError}; -use wallet::KeyStore; +use crate::RpcState; use beacon::Beacon; use blockstore::BlockStore; -use crate::RpcState; -use serde::Serialize; -use forest_libp2p::{PeerId, Multiaddr, NetworkMessage, NetRPCMethods, NetRPCResponse}; +use forest_libp2p::{Multiaddr, NetRPCMethods, NetRPCResponse, NetworkMessage, PeerId}; use futures::channel::oneshot; - +use jsonrpc_v2::{Data, Error as JsonRpcError}; +use serde::Serialize; +use wallet::KeyStore; #[derive(Serialize)] #[serde(rename_all = "PascalCase")] @@ -25,12 +24,14 @@ pub(crate) async fn net_addrs_listen< let (tx, rx) = oneshot::channel(); let req = NetworkMessage::JSONRPCRequest { method: NetRPCMethods::NetAddrsListen, - response_channel: tx + response_channel: tx, }; data.network_send.send(req).await; let resp = match rx.await? { - NetRPCResponse::NetAddrsListen(id, addrs) => AddrInfo{id: id.to_string(), addrs} - + NetRPCResponse::NetAddrsListen(id, addrs) => AddrInfo { + id: id.to_string(), + addrs, + }, }; Ok(resp) -} \ No newline at end of file +} diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index 25d8771d3f7d..a3bdd2028c21 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -7,7 +7,7 @@ use actor::miner::MinerInfo; use actor::miner::{ ChainSectorInfo, Fault, SectorOnChainInfo, SectorPreCommitOnChainInfo, State, WorkerKeyChange, }; -use actor::{market, DealID, DealWeight, TokenAmount, STORAGE_MARKET_ACTOR_ADDR, miner}; +use actor::{market, miner, DealID, DealWeight, TokenAmount, STORAGE_MARKET_ACTOR_ADDR}; use address::{json::AddressJson, Address}; use async_std::task; use beacon::{json::BeaconEntryJson, Beacon, BeaconEntry}; @@ -779,8 +779,12 @@ pub(crate) async fn state_miner_sector_allocated< ) -> Result { let (AddressJson(maddr), sector_num, TipsetKeysJson(tsk)) = params; let ts = data.chain_store.tipset_from_keys(&tsk).await?; - let mas: miner::State = data.state_manager.load_actor_state(&maddr, ts.parent_state())?; - let mut allocated_sectors: bitfield::BitField = data.chain_store.db + let mas: miner::State = data + .state_manager + .load_actor_state(&maddr, ts.parent_state())?; + let mut allocated_sectors: bitfield::BitField = data + .chain_store + .db .get(&mas.allocated_sectors)? .ok_or("allocated sectors bitfield not found")?; From 9de16f789e7790ecd955e2a37902f7b5b069b272 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Fri, 11 Dec 2020 00:53:32 -0500 Subject: [PATCH 4/7] clippy --- node/rpc/src/net_api.rs | 2 +- node/rpc/src/state_api.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/node/rpc/src/net_api.rs b/node/rpc/src/net_api.rs index acd18b10f395..24b8c5cbebc9 100644 --- a/node/rpc/src/net_api.rs +++ b/node/rpc/src/net_api.rs @@ -1,7 +1,7 @@ use crate::RpcState; use beacon::Beacon; use blockstore::BlockStore; -use forest_libp2p::{Multiaddr, NetRPCMethods, NetRPCResponse, NetworkMessage, PeerId}; +use forest_libp2p::{Multiaddr, NetRPCMethods, NetRPCResponse, NetworkMessage}; use futures::channel::oneshot; use jsonrpc_v2::{Data, Error as JsonRpcError}; use serde::Serialize; diff --git a/node/rpc/src/state_api.rs b/node/rpc/src/state_api.rs index a3bdd2028c21..306fe302ad1b 100644 --- a/node/rpc/src/state_api.rs +++ b/node/rpc/src/state_api.rs @@ -782,7 +782,7 @@ pub(crate) async fn state_miner_sector_allocated< let mas: miner::State = data .state_manager .load_actor_state(&maddr, ts.parent_state())?; - let mut allocated_sectors: bitfield::BitField = data + let allocated_sectors: bitfield::BitField = data .chain_store .db .get(&mas.allocated_sectors)? From fe72ce7dc19d2a9f729eaa527b00c00d5f1e842e Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Fri, 11 Dec 2020 01:21:32 -0500 Subject: [PATCH 5/7] license --- node/rpc/src/net_api.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/node/rpc/src/net_api.rs b/node/rpc/src/net_api.rs index 24b8c5cbebc9..9b7eed6f7183 100644 --- a/node/rpc/src/net_api.rs +++ b/node/rpc/src/net_api.rs @@ -1,3 +1,6 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + use crate::RpcState; use beacon::Beacon; use blockstore::BlockStore; From 60c698d685945e521b51e2854a58e30817a55bc0 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Fri, 11 Dec 2020 01:33:48 -0500 Subject: [PATCH 6/7] clippy caught this on the CI but not locally? --- node/forest_libp2p/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 677eb649f71b..ccb3a3bd2296 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -347,7 +347,7 @@ where NetRPCMethods::NetAddrsListen => { let listeners: Vec<_> = Swarm::listeners( swarm_stream.get_mut()).cloned().collect(); let peer_id = Swarm::local_peer_id(swarm_stream.get_mut()); - if let Err(_) = response_channel.send(NetRPCResponse::NetAddrsListen(peer_id.clone(), listeners)) { + if response_channel.send(NetRPCResponse::NetAddrsListen(peer_id.clone(), listeners)).is_err() { warn!("Failed to get Libp2p listeners"); } } From f8bdd7a6f35743f177ba6db114067637080229cd Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Fri, 11 Dec 2020 10:47:21 -0500 Subject: [PATCH 7/7] fix suggestions --- node/forest_libp2p/src/service.rs | 14 ++++---------- node/rpc/src/net_api.rs | 17 +++++++---------- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index ccb3a3bd2296..a18c5e9c74cf 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -107,17 +107,11 @@ pub enum NetworkMessage { }, JSONRPCRequest { method: NetRPCMethods, - response_channel: OneShotSender, }, } #[derive(Debug)] pub enum NetRPCMethods { - NetAddrsListen, -} -#[derive(Debug)] -pub enum NetRPCResponse { - /// Your peer id and all the multiaddrs you are listening on - NetAddrsListen(PeerId, Vec), + NetAddrsListen(OneShotSender<(PeerId, Vec)>), } /// The Libp2pService listens to events from the Libp2p swarm. pub struct Libp2pService { @@ -342,12 +336,12 @@ where self.bitswap_response_channels.insert(cid, vec![response_channel]); } } - NetworkMessage::JSONRPCRequest {method, response_channel} => { + NetworkMessage::JSONRPCRequest { method } => { match method { - NetRPCMethods::NetAddrsListen => { + NetRPCMethods::NetAddrsListen(response_channel) => { let listeners: Vec<_> = Swarm::listeners( swarm_stream.get_mut()).cloned().collect(); let peer_id = Swarm::local_peer_id(swarm_stream.get_mut()); - if response_channel.send(NetRPCResponse::NetAddrsListen(peer_id.clone(), listeners)).is_err() { + if response_channel.send((peer_id.clone(), listeners)).is_err() { warn!("Failed to get Libp2p listeners"); } } diff --git a/node/rpc/src/net_api.rs b/node/rpc/src/net_api.rs index 9b7eed6f7183..9ab3ea1a95de 100644 --- a/node/rpc/src/net_api.rs +++ b/node/rpc/src/net_api.rs @@ -4,7 +4,7 @@ use crate::RpcState; use beacon::Beacon; use blockstore::BlockStore; -use forest_libp2p::{Multiaddr, NetRPCMethods, NetRPCResponse, NetworkMessage}; +use forest_libp2p::{Multiaddr, NetRPCMethods, NetworkMessage}; use futures::channel::oneshot; use jsonrpc_v2::{Data, Error as JsonRpcError}; use serde::Serialize; @@ -26,15 +26,12 @@ pub(crate) async fn net_addrs_listen< ) -> Result { let (tx, rx) = oneshot::channel(); let req = NetworkMessage::JSONRPCRequest { - method: NetRPCMethods::NetAddrsListen, - response_channel: tx, + method: NetRPCMethods::NetAddrsListen(tx), }; data.network_send.send(req).await; - let resp = match rx.await? { - NetRPCResponse::NetAddrsListen(id, addrs) => AddrInfo { - id: id.to_string(), - addrs, - }, - }; - Ok(resp) + let (id, addrs) = rx.await?; + Ok(AddrInfo { + id: id.to_string(), + addrs, + }) }