Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc): better error handling for transaction submission #2525

Merged
merged 4 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,9 @@ impl Client {
&mut self,
tx: SignedTransaction,
is_forwarded: bool,
check_only: bool,
) -> NetworkClientResponses {
unwrap_or_return!(self.process_tx_internal(&tx, is_forwarded), {
unwrap_or_return!(self.process_tx_internal(&tx, is_forwarded, check_only), {
let me = self.validator_signer.as_ref().map(|vs| vs.validator_id());
warn!(target: "client", "I'm: {:?} Dropping tx: {:?}", me, tx);
NetworkClientResponses::NoResponse
Expand Down Expand Up @@ -1115,6 +1116,7 @@ impl Client {
&mut self,
tx: &SignedTransaction,
is_forwarded: bool,
check_only: bool,
) -> Result<NetworkClientResponses, Error> {
let head = self.chain.head()?;
let me = self.validator_signer.as_ref().map(|vs| vs.validator_id());
Expand Down Expand Up @@ -1160,6 +1162,8 @@ impl Client {
{
debug!(target: "client", "Invalid tx: {:?}", err);
Ok(NetworkClientResponses::InvalidTx(err))
} else if check_only {
Ok(NetworkClientResponses::ValidTx)
} else {
let active_validator = self.active_validator(shard_id)?;

Expand Down Expand Up @@ -1187,6 +1191,8 @@ impl Client {
Ok(NetworkClientResponses::NoResponse)
}
}
} else if check_only {
Ok(NetworkClientResponses::DoesNotTrackShard)
} else {
if is_forwarded {
// received forwarded transaction but we are not tracking the shard
Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ impl Handler<NetworkClientMessages> for ClientActor {
_ => panic!("invalid adversary message"),
};
}
NetworkClientMessages::Transaction { transaction, is_forwarded } => {
self.client.process_tx(transaction, is_forwarded)
NetworkClientMessages::Transaction { transaction, is_forwarded, check_only } => {
self.client.process_tx(transaction, is_forwarded, check_only)
}
NetworkClientMessages::Block(block, peer_id, was_requested) => {
let blocks_at_height = self
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ impl TestEnv {
100,
self.clients[id].chain.head().unwrap().last_block_hash,
);
self.clients[id].process_tx(tx, false)
self.clients[id].process_tx(tx, false, false)
frol marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn restart(&mut self, id: usize) {
Expand Down
1 change: 1 addition & 0 deletions chain/client/tests/bug_repros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ fn repro_1183() {
block.header.prev_hash,
),
is_forwarded: false,
check_only: false,
});
nonce_delta += 1
}
Expand Down
1 change: 1 addition & 0 deletions chain/client/tests/catching_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ mod tests {
nonce, from, to, &signer, amount, block_hash,
),
is_forwarded: false,
check_only: false,
});
}

Expand Down
3 changes: 2 additions & 1 deletion chain/client/tests/challenges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ fn test_verify_chunk_invalid_state_challenge() {
genesis_hash,
),
false,
false,
);
env.produce_block(0, 2);

Expand Down Expand Up @@ -650,7 +651,7 @@ fn test_fishermen_challenge() {
signer.public_key(),
genesis_hash,
);
env.clients[0].process_tx(stake_transaction, false);
env.clients[0].process_tx(stake_transaction, false, false);
for i in 1..=11 {
env.produce_block(0, i);
}
Expand Down
6 changes: 3 additions & 3 deletions chain/client/tests/chunks_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ fn chunks_produced_and_distributed_common(
let connectors_ = connectors.write().unwrap();
connectors_[0]
.0
.do_send(NetworkClientMessages::Transaction{transaction: SignedTransaction::empty(block_hash), is_forwarded:false});
.do_send(NetworkClientMessages::Transaction { transaction: SignedTransaction::empty(block_hash), is_forwarded:false, check_only: false });
connectors_[1]
.0
.do_send(NetworkClientMessages::Transaction{transaction: SignedTransaction::empty(block_hash), is_forwarded:false});
.do_send(NetworkClientMessages::Transaction { transaction: SignedTransaction::empty(block_hash), is_forwarded:false, check_only: false });
connectors_[2]
.0
.do_send(NetworkClientMessages::Transaction{transaction:SignedTransaction::empty(block_hash), is_forwarded:false});
.do_send(NetworkClientMessages::Transaction { transaction: SignedTransaction::empty(block_hash), is_forwarded:false, check_only: false });
future::ready(())
}));
})
Expand Down
1 change: 1 addition & 0 deletions chain/client/tests/cross_shard_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ mod tests {
block_hash,
),
is_forwarded: false,
check_only: false,
})
.then(move |x| {
match x.unwrap() {
Expand Down
17 changes: 9 additions & 8 deletions chain/client/tests/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ fn produce_blocks_with_tx() {
client.do_send(NetworkClientMessages::Transaction {
transaction: SignedTransaction::empty(block_hash),
is_forwarded: false,
check_only: false,
});
future::ready(())
}))
Expand Down Expand Up @@ -622,7 +623,7 @@ fn test_process_invalid_tx() {
);
produce_blocks(&mut client, 12);
assert_eq!(
client.process_tx(tx, false),
client.process_tx(tx, false, false),
NetworkClientResponses::InvalidTx(InvalidTxError::Expired)
);
let tx2 = SignedTransaction::new(
Expand All @@ -637,7 +638,7 @@ fn test_process_invalid_tx() {
},
);
assert_eq!(
client.process_tx(tx2, false),
client.process_tx(tx2, false, false),
NetworkClientResponses::InvalidTx(InvalidTxError::Expired)
);
}
Expand Down Expand Up @@ -949,7 +950,7 @@ fn test_tx_forwarding() {
let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();
let genesis_hash = genesis_block.hash();
// forward to 2 chunk producers
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), false);
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), false, false);
assert_eq!(env.network_adapters[0].requests.read().unwrap().len(), 4);
}

Expand All @@ -960,7 +961,7 @@ fn test_tx_forwarding_no_double_forwarding() {
let mut env = TestEnv::new(chain_genesis, 50, 50);
let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();
let genesis_hash = genesis_block.hash();
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), true);
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), true, false);
assert!(env.network_adapters[0].requests.read().unwrap().is_empty());
}

Expand Down Expand Up @@ -999,7 +1000,7 @@ fn test_tx_forward_around_epoch_boundary() {
signer.public_key.clone(),
genesis_hash,
);
env.clients[0].process_tx(tx, false);
env.clients[0].process_tx(tx, false, false);

for i in 1..epoch_length * 2 {
let block = env.clients[0].produce_block(i).unwrap().unwrap();
Expand All @@ -1018,7 +1019,7 @@ fn test_tx_forward_around_epoch_boundary() {
1,
genesis_hash,
);
env.clients[2].process_tx(tx, false);
env.clients[2].process_tx(tx, false, false);
let mut accounts_to_forward = HashSet::new();
for request in env.network_adapters[2].requests.read().unwrap().iter() {
if let NetworkRequests::ForwardTx(account_id, _) = request {
Expand Down Expand Up @@ -1173,7 +1174,7 @@ fn test_gas_price_change() {
- send_money_total_gas as u128 * min_gas_price,
genesis_hash,
);
env.clients[0].process_tx(tx, false);
env.clients[0].process_tx(tx, false, false);
env.produce_block(0, 1);
let tx = SignedTransaction::send_money(
2,
Expand All @@ -1183,7 +1184,7 @@ fn test_gas_price_change() {
1,
genesis_hash,
);
env.clients[0].process_tx(tx, false);
env.clients[0].process_tx(tx, false, false);
for i in 2..=4 {
env.produce_block(0, i);
}
Expand Down
2 changes: 2 additions & 0 deletions chain/jsonrpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ jsonrpc_client!(pub struct JsonRpcClient {
pub fn broadcast_tx_commit(&self, tx: String) -> RpcRequest<FinalExecutionOutcomeView>;
pub fn status(&self) -> RpcRequest<StatusResponse>;
#[allow(non_snake_case)]
pub fn EXPERIMENTAL_check_tx(&self, tx: String) -> RpcRequest<serde_json::Value>;
#[allow(non_snake_case)]
pub fn EXPERIMENTAL_genesis_config(&self) -> RpcRequest<serde_json::Value>;
pub fn health(&self) -> RpcRequest<()>;
pub fn tx(&self, hash: String, account_id: String) -> RpcRequest<FinalExecutionOutcomeView>;
Expand Down
84 changes: 68 additions & 16 deletions chain/jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ use near_network::types::{NetworkAdversarialMessage, NetworkViewClientMessages};
use near_network::{NetworkClientMessages, NetworkClientResponses};
use near_primitives::errors::{InvalidTxError, TxExecutionError};
use near_primitives::hash::CryptoHash;
use near_primitives::rpc::{
RpcGenesisRecordsRequest, RpcQueryRequest, RpcStateChangesInBlockRequest,
RpcStateChangesInBlockResponse, RpcStateChangesRequest, RpcStateChangesResponse,
};
use near_primitives::rpc::{RpcGenesisRecordsRequest, RpcQueryRequest, RpcStateChangesInBlockRequest, RpcStateChangesInBlockResponse, RpcStateChangesRequest, RpcStateChangesResponse, RpcBroadcastTxSyncResponse};
use near_primitives::serialize::{from_base, from_base64, BaseEncode};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, BlockId, BlockIdOrFinality, MaybeBlockId};
Expand Down Expand Up @@ -129,6 +126,7 @@ pub enum ServerError {
TxExecutionError(TxExecutionError),
Timeout,
Closed,
InternalError,
}

impl Display for ServerError {
Expand All @@ -137,6 +135,7 @@ impl Display for ServerError {
ServerError::TxExecutionError(e) => write!(f, "ServerError: {}", e),
ServerError::Timeout => write!(f, "ServerError: Timeout"),
ServerError::Closed => write!(f, "ServerError: Closed"),
ServerError::InternalError => write!(f, "ServerError: Internal Error"),
}
}
}
Expand Down Expand Up @@ -208,7 +207,9 @@ impl JsonRpcHandler {

match request.method.as_ref() {
"broadcast_tx_async" => self.send_tx_async(request.params).await,
"EXPERIMENTAL_broadcast_tx_sync" => self.send_tx_sync(request.params).await,
"broadcast_tx_commit" => self.send_tx_commit(request.params).await,
"EXPERIMENTAL_check_tx" => self.check_tx(request.params).await,
"validators" => self.validators(request.params).await,
"query" => self.query(request.params).await,
"health" => self.health().await,
Expand All @@ -230,11 +231,11 @@ impl JsonRpcHandler {
async fn send_tx_async(&self, params: Option<Value>) -> Result<Value, RpcError> {
let tx = parse_tx(params)?;
let hash = (&tx.get_hash()).to_base();
actix::spawn(
self.client_addr
.send(NetworkClientMessages::Transaction { transaction: tx, is_forwarded: false })
.map(drop),
);
self.client_addr.do_send(NetworkClientMessages::Transaction {
transaction: tx,
is_forwarded: false,
check_only: false,
});
Ok(Value::String(hash))
}

Expand Down Expand Up @@ -283,16 +284,67 @@ impl JsonRpcHandler {
.map_err(|_| timeout_err())?
}

async fn send_tx(
&self,
tx: SignedTransaction,
check_only: bool,
) -> Result<NetworkClientResponses, RpcError> {
Ok(self
.client_addr
.send(NetworkClientMessages::Transaction {
transaction: tx,
is_forwarded: false,
check_only,
})
.map_err(|err| RpcError::server_error(Some(ServerError::from(err))))
.await?)
}

async fn send_tx_sync(&self, params: Option<Value>) -> Result<Value, RpcError> {
self.send_or_check_tx(params, false).await
}

async fn check_tx(&self, params: Option<Value>) -> Result<Value, RpcError> {
self.send_or_check_tx(params, true).await
}

async fn send_or_check_tx(
&self,
params: Option<Value>,
check_only: bool,
) -> Result<Value, RpcError> {
let tx = parse_tx(params)?;
let tx_hash = (&tx.get_hash()).to_base();
frol marked this conversation as resolved.
Show resolved Hide resolved
match self.send_tx(tx, check_only).await? {
NetworkClientResponses::ValidTx => {
if check_only {
Ok(Value::Null)
} else {
jsonify(Ok(Ok(RpcBroadcastTxSyncResponse { transaction_hash: tx_hash, is_routed: false })))
}
}
NetworkClientResponses::RequestRouted => {
if check_only {
Err(RpcError::server_error(Some("Node doesn't track this shard. Cannot determine whether the transaction is valid".to_string())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bowenwang1996
What does this error mean for user?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fckt Good catch.

We should help users to understand that they are expected to reach another node that tracks the shard.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frol how to better phrase it? Should we include something like "try another node"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the node must be found automatically? (maybe the request should contain a sender_id?) How to use this method?

} else {
jsonify(Ok(Ok(RpcBroadcastTxSyncResponse { transaction_hash: tx_hash, is_routed: true })))
}
}
NetworkClientResponses::InvalidTx(err) => {
Err(RpcError::server_error(Some(ServerError::TxExecutionError(err.into()))))
}
_ => {
// this is only possible if something went wrong with the node internally.
Err(RpcError::server_error(Some(ServerError::InternalError)))
}
}
}

async fn send_tx_commit(&self, params: Option<Value>) -> Result<Value, RpcError> {
let tx = parse_tx(params)?;
let tx_hash = tx.get_hash();
let signer_account_id = tx.transaction.signer_id.clone();
let result = self
.client_addr
.send(NetworkClientMessages::Transaction { transaction: tx, is_forwarded: false })
.map_err(|err| RpcError::server_error(Some(ServerError::from(err))))
.await?;
match result {
match self.send_tx(tx, false).await? {
NetworkClientResponses::ValidTx | NetworkClientResponses::RequestRouted => {
self.tx_polling(tx_hash, signer_account_id).await
}
Expand All @@ -302,7 +354,7 @@ impl JsonRpcHandler {
NetworkClientResponses::NoResponse => {
Err(RpcError::server_error(Some(ServerError::Timeout)))
}
_ => unreachable!(),
_ => Err(RpcError::server_error(Some(ServerError::InternalError))),
}
}

Expand Down
25 changes: 25 additions & 0 deletions chain/jsonrpc/tests/rpc_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,28 @@ fn test_tx_status_missing_tx() {
}
});
}

#[test]
fn test_check_invalid_tx() {
test_with_client!(test_utils::NodeType::Validator, client, async move {
let signer = InMemorySigner::from_seed("test1", KeyType::ED25519, "test1");
// invalid base hash
let tx = SignedTransaction::send_money(
1,
"test1".to_string(),
"test2".to_string(),
&signer,
100,
hash(&[1]),
);
let bytes = tx.try_to_vec().unwrap();
match client.EXPERIMENTAL_check_tx(to_base64(&bytes)).await {
Err(e) => {
let s = serde_json::to_string(&e.data.unwrap()).unwrap();
println!("{}", s);
assert_eq!(s, "{\"TxExecutionError\":{\"InvalidTxError\":\"Expired\"}}");
}
Ok(_) => panic!("transaction should not succeed"),
}
});
}
12 changes: 10 additions & 2 deletions chain/network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,11 @@ impl Peer {
}
PeerMessage::Transaction(transaction) => {
near_metrics::inc_counter(&metrics::PEER_TRANSACTION_RECEIVED_TOTAL);
NetworkClientMessages::Transaction { transaction, is_forwarded: false }
NetworkClientMessages::Transaction {
transaction,
is_forwarded: false,
check_only: false,
}
}
PeerMessage::BlockHeaders(headers) => {
NetworkClientMessages::BlockHeaders(headers, peer_id)
Expand All @@ -428,7 +432,11 @@ impl Peer {
NetworkClientMessages::BlockApproval(approval, peer_id)
}
RoutedMessageBody::ForwardTx(transaction) => {
NetworkClientMessages::Transaction { transaction, is_forwarded: true }
NetworkClientMessages::Transaction {
transaction,
is_forwarded: true,
check_only: false,
}
}

RoutedMessageBody::StateResponse(info) => {
Expand Down
Loading