Skip to content

Commit

Permalink
fix(rpc): better error handling for transaction submission (#2525)
Browse files Browse the repository at this point in the history
Add two experimental rpcs:
* `EXPERIMENTAL_broadcast_tx_sync` that sends the transaction and waits for its validity to be checked before returning, but doesn't wait for the transaction to be processed.
* `EXPERIMENTAL_check_tx` that checks whether a transaction is still valid. For the rpc to work, the node that the request is sent to must track the shard of the sender, otherwise it will return `Cannot determine whether the transaction is valid`.

Fixes #2039.

Test plan
---------
* `test_check_invalid_tx` that does a sanity check on `EXPERIMENTAL_check_tx`.
* pytest `rpc_tx_submission` that checks transactions can be submitted in all three ways and work well. Also check that invalid transactions can be caught by `EXPERIMENTAL_check_tx`.
  • Loading branch information
bowenwang1996 authored Apr 28, 2020
1 parent 38d54ba commit a5c92ee
Show file tree
Hide file tree
Showing 20 changed files with 212 additions and 37 deletions.
8 changes: 7 additions & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,9 @@ impl Client {
&mut self,
tx: SignedTransaction,
is_forwarded: bool,
check_only: bool,
) -> NetworkClientResponses {
unwrap_or_return!(self.process_tx_internal(&tx, is_forwarded), {
unwrap_or_return!(self.process_tx_internal(&tx, is_forwarded, check_only), {
let me = self.validator_signer.as_ref().map(|vs| vs.validator_id());
warn!(target: "client", "I'm: {:?} Dropping tx: {:?}", me, tx);
NetworkClientResponses::NoResponse
Expand Down Expand Up @@ -1115,6 +1116,7 @@ impl Client {
&mut self,
tx: &SignedTransaction,
is_forwarded: bool,
check_only: bool,
) -> Result<NetworkClientResponses, Error> {
let head = self.chain.head()?;
let me = self.validator_signer.as_ref().map(|vs| vs.validator_id());
Expand Down Expand Up @@ -1160,6 +1162,8 @@ impl Client {
{
debug!(target: "client", "Invalid tx: {:?}", err);
Ok(NetworkClientResponses::InvalidTx(err))
} else if check_only {
Ok(NetworkClientResponses::ValidTx)
} else {
let active_validator = self.active_validator(shard_id)?;

Expand Down Expand Up @@ -1187,6 +1191,8 @@ impl Client {
Ok(NetworkClientResponses::NoResponse)
}
}
} else if check_only {
Ok(NetworkClientResponses::DoesNotTrackShard)
} else {
if is_forwarded {
// received forwarded transaction but we are not tracking the shard
Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ impl Handler<NetworkClientMessages> for ClientActor {
_ => panic!("invalid adversary message"),
};
}
NetworkClientMessages::Transaction { transaction, is_forwarded } => {
self.client.process_tx(transaction, is_forwarded)
NetworkClientMessages::Transaction { transaction, is_forwarded, check_only } => {
self.client.process_tx(transaction, is_forwarded, check_only)
}
NetworkClientMessages::Block(block, peer_id, was_requested) => {
let blocks_at_height = self
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ impl TestEnv {
100,
self.clients[id].chain.head().unwrap().last_block_hash,
);
self.clients[id].process_tx(tx, false)
self.clients[id].process_tx(tx, false, false)
}

pub fn restart(&mut self, id: usize) {
Expand Down
1 change: 1 addition & 0 deletions chain/client/tests/bug_repros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ fn repro_1183() {
block.header.prev_hash,
),
is_forwarded: false,
check_only: false,
});
nonce_delta += 1
}
Expand Down
1 change: 1 addition & 0 deletions chain/client/tests/catching_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ mod tests {
nonce, from, to, &signer, amount, block_hash,
),
is_forwarded: false,
check_only: false,
});
}

Expand Down
3 changes: 2 additions & 1 deletion chain/client/tests/challenges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ fn test_verify_chunk_invalid_state_challenge() {
genesis_hash,
),
false,
false,
);
env.produce_block(0, 2);

Expand Down Expand Up @@ -650,7 +651,7 @@ fn test_fishermen_challenge() {
signer.public_key(),
genesis_hash,
);
env.clients[0].process_tx(stake_transaction, false);
env.clients[0].process_tx(stake_transaction, false, false);
for i in 1..=11 {
env.produce_block(0, i);
}
Expand Down
6 changes: 3 additions & 3 deletions chain/client/tests/chunks_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ fn chunks_produced_and_distributed_common(
let connectors_ = connectors.write().unwrap();
connectors_[0]
.0
.do_send(NetworkClientMessages::Transaction{transaction: SignedTransaction::empty(block_hash), is_forwarded:false});
.do_send(NetworkClientMessages::Transaction { transaction: SignedTransaction::empty(block_hash), is_forwarded:false, check_only: false });
connectors_[1]
.0
.do_send(NetworkClientMessages::Transaction{transaction: SignedTransaction::empty(block_hash), is_forwarded:false});
.do_send(NetworkClientMessages::Transaction { transaction: SignedTransaction::empty(block_hash), is_forwarded:false, check_only: false });
connectors_[2]
.0
.do_send(NetworkClientMessages::Transaction{transaction:SignedTransaction::empty(block_hash), is_forwarded:false});
.do_send(NetworkClientMessages::Transaction { transaction: SignedTransaction::empty(block_hash), is_forwarded:false, check_only: false });
future::ready(())
}));
})
Expand Down
1 change: 1 addition & 0 deletions chain/client/tests/cross_shard_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ mod tests {
block_hash,
),
is_forwarded: false,
check_only: false,
})
.then(move |x| {
match x.unwrap() {
Expand Down
17 changes: 9 additions & 8 deletions chain/client/tests/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ fn produce_blocks_with_tx() {
client.do_send(NetworkClientMessages::Transaction {
transaction: SignedTransaction::empty(block_hash),
is_forwarded: false,
check_only: false,
});
future::ready(())
}))
Expand Down Expand Up @@ -622,7 +623,7 @@ fn test_process_invalid_tx() {
);
produce_blocks(&mut client, 12);
assert_eq!(
client.process_tx(tx, false),
client.process_tx(tx, false, false),
NetworkClientResponses::InvalidTx(InvalidTxError::Expired)
);
let tx2 = SignedTransaction::new(
Expand All @@ -637,7 +638,7 @@ fn test_process_invalid_tx() {
},
);
assert_eq!(
client.process_tx(tx2, false),
client.process_tx(tx2, false, false),
NetworkClientResponses::InvalidTx(InvalidTxError::Expired)
);
}
Expand Down Expand Up @@ -949,7 +950,7 @@ fn test_tx_forwarding() {
let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();
let genesis_hash = genesis_block.hash();
// forward to 2 chunk producers
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), false);
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), false, false);
assert_eq!(env.network_adapters[0].requests.read().unwrap().len(), 4);
}

Expand All @@ -960,7 +961,7 @@ fn test_tx_forwarding_no_double_forwarding() {
let mut env = TestEnv::new(chain_genesis, 50, 50);
let genesis_block = env.clients[0].chain.get_block_by_height(0).unwrap();
let genesis_hash = genesis_block.hash();
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), true);
env.clients[0].process_tx(SignedTransaction::empty(genesis_hash), true, false);
assert!(env.network_adapters[0].requests.read().unwrap().is_empty());
}

Expand Down Expand Up @@ -999,7 +1000,7 @@ fn test_tx_forward_around_epoch_boundary() {
signer.public_key.clone(),
genesis_hash,
);
env.clients[0].process_tx(tx, false);
env.clients[0].process_tx(tx, false, false);

for i in 1..epoch_length * 2 {
let block = env.clients[0].produce_block(i).unwrap().unwrap();
Expand All @@ -1018,7 +1019,7 @@ fn test_tx_forward_around_epoch_boundary() {
1,
genesis_hash,
);
env.clients[2].process_tx(tx, false);
env.clients[2].process_tx(tx, false, false);
let mut accounts_to_forward = HashSet::new();
for request in env.network_adapters[2].requests.read().unwrap().iter() {
if let NetworkRequests::ForwardTx(account_id, _) = request {
Expand Down Expand Up @@ -1173,7 +1174,7 @@ fn test_gas_price_change() {
- send_money_total_gas as u128 * min_gas_price,
genesis_hash,
);
env.clients[0].process_tx(tx, false);
env.clients[0].process_tx(tx, false, false);
env.produce_block(0, 1);
let tx = SignedTransaction::send_money(
2,
Expand All @@ -1183,7 +1184,7 @@ fn test_gas_price_change() {
1,
genesis_hash,
);
env.clients[0].process_tx(tx, false);
env.clients[0].process_tx(tx, false, false);
for i in 2..=4 {
env.produce_block(0, i);
}
Expand Down
2 changes: 2 additions & 0 deletions chain/jsonrpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ jsonrpc_client!(pub struct JsonRpcClient {
pub fn broadcast_tx_commit(&self, tx: String) -> RpcRequest<FinalExecutionOutcomeView>;
pub fn status(&self) -> RpcRequest<StatusResponse>;
#[allow(non_snake_case)]
pub fn EXPERIMENTAL_check_tx(&self, tx: String) -> RpcRequest<serde_json::Value>;
#[allow(non_snake_case)]
pub fn EXPERIMENTAL_genesis_config(&self) -> RpcRequest<serde_json::Value>;
pub fn health(&self) -> RpcRequest<()>;
pub fn tx(&self, hash: String, account_id: String) -> RpcRequest<FinalExecutionOutcomeView>;
Expand Down
90 changes: 76 additions & 14 deletions chain/jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ use near_network::{NetworkClientMessages, NetworkClientResponses};
use near_primitives::errors::{InvalidTxError, TxExecutionError};
use near_primitives::hash::CryptoHash;
use near_primitives::rpc::{
RpcGenesisRecordsRequest, RpcQueryRequest, RpcStateChangesInBlockRequest,
RpcStateChangesInBlockResponse, RpcStateChangesRequest, RpcStateChangesResponse,
RpcBroadcastTxSyncResponse, RpcGenesisRecordsRequest, RpcQueryRequest,
RpcStateChangesInBlockRequest, RpcStateChangesInBlockResponse, RpcStateChangesRequest,
RpcStateChangesResponse,
};
use near_primitives::serialize::{from_base, from_base64, BaseEncode};
use near_primitives::transaction::SignedTransaction;
Expand Down Expand Up @@ -129,6 +130,7 @@ pub enum ServerError {
TxExecutionError(TxExecutionError),
Timeout,
Closed,
InternalError,
}

impl Display for ServerError {
Expand All @@ -137,6 +139,7 @@ impl Display for ServerError {
ServerError::TxExecutionError(e) => write!(f, "ServerError: {}", e),
ServerError::Timeout => write!(f, "ServerError: Timeout"),
ServerError::Closed => write!(f, "ServerError: Closed"),
ServerError::InternalError => write!(f, "ServerError: Internal Error"),
}
}
}
Expand Down Expand Up @@ -208,7 +211,9 @@ impl JsonRpcHandler {

match request.method.as_ref() {
"broadcast_tx_async" => self.send_tx_async(request.params).await,
"EXPERIMENTAL_broadcast_tx_sync" => self.send_tx_sync(request.params).await,
"broadcast_tx_commit" => self.send_tx_commit(request.params).await,
"EXPERIMENTAL_check_tx" => self.check_tx(request.params).await,
"validators" => self.validators(request.params).await,
"query" => self.query(request.params).await,
"health" => self.health().await,
Expand All @@ -230,11 +235,11 @@ impl JsonRpcHandler {
async fn send_tx_async(&self, params: Option<Value>) -> Result<Value, RpcError> {
let tx = parse_tx(params)?;
let hash = (&tx.get_hash()).to_base();
actix::spawn(
self.client_addr
.send(NetworkClientMessages::Transaction { transaction: tx, is_forwarded: false })
.map(drop),
);
self.client_addr.do_send(NetworkClientMessages::Transaction {
transaction: tx,
is_forwarded: false,
check_only: false,
});
Ok(Value::String(hash))
}

Expand Down Expand Up @@ -283,16 +288,73 @@ impl JsonRpcHandler {
.map_err(|_| timeout_err())?
}

async fn send_tx(
&self,
tx: SignedTransaction,
check_only: bool,
) -> Result<NetworkClientResponses, RpcError> {
Ok(self
.client_addr
.send(NetworkClientMessages::Transaction {
transaction: tx,
is_forwarded: false,
check_only,
})
.map_err(|err| RpcError::server_error(Some(ServerError::from(err))))
.await?)
}

async fn send_tx_sync(&self, params: Option<Value>) -> Result<Value, RpcError> {
self.send_or_check_tx(params, false).await
}

async fn check_tx(&self, params: Option<Value>) -> Result<Value, RpcError> {
self.send_or_check_tx(params, true).await
}

async fn send_or_check_tx(
&self,
params: Option<Value>,
check_only: bool,
) -> Result<Value, RpcError> {
let tx = parse_tx(params)?;
let tx_hash = (&tx.get_hash()).to_base();
match self.send_tx(tx, check_only).await? {
NetworkClientResponses::ValidTx => {
if check_only {
Ok(Value::Null)
} else {
jsonify(Ok(Ok(RpcBroadcastTxSyncResponse {
transaction_hash: tx_hash,
is_routed: false,
})))
}
}
NetworkClientResponses::RequestRouted => {
if check_only {
Err(RpcError::server_error(Some("Node doesn't track this shard. Cannot determine whether the transaction is valid".to_string())))
} else {
jsonify(Ok(Ok(RpcBroadcastTxSyncResponse {
transaction_hash: tx_hash,
is_routed: true,
})))
}
}
NetworkClientResponses::InvalidTx(err) => {
Err(RpcError::server_error(Some(ServerError::TxExecutionError(err.into()))))
}
_ => {
// this is only possible if something went wrong with the node internally.
Err(RpcError::server_error(Some(ServerError::InternalError)))
}
}
}

async fn send_tx_commit(&self, params: Option<Value>) -> Result<Value, RpcError> {
let tx = parse_tx(params)?;
let tx_hash = tx.get_hash();
let signer_account_id = tx.transaction.signer_id.clone();
let result = self
.client_addr
.send(NetworkClientMessages::Transaction { transaction: tx, is_forwarded: false })
.map_err(|err| RpcError::server_error(Some(ServerError::from(err))))
.await?;
match result {
match self.send_tx(tx, false).await? {
NetworkClientResponses::ValidTx | NetworkClientResponses::RequestRouted => {
self.tx_polling(tx_hash, signer_account_id).await
}
Expand All @@ -302,7 +364,7 @@ impl JsonRpcHandler {
NetworkClientResponses::NoResponse => {
Err(RpcError::server_error(Some(ServerError::Timeout)))
}
_ => unreachable!(),
_ => Err(RpcError::server_error(Some(ServerError::InternalError))),
}
}

Expand Down
25 changes: 25 additions & 0 deletions chain/jsonrpc/tests/rpc_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,28 @@ fn test_tx_status_missing_tx() {
}
});
}

#[test]
fn test_check_invalid_tx() {
test_with_client!(test_utils::NodeType::Validator, client, async move {
let signer = InMemorySigner::from_seed("test1", KeyType::ED25519, "test1");
// invalid base hash
let tx = SignedTransaction::send_money(
1,
"test1".to_string(),
"test2".to_string(),
&signer,
100,
hash(&[1]),
);
let bytes = tx.try_to_vec().unwrap();
match client.EXPERIMENTAL_check_tx(to_base64(&bytes)).await {
Err(e) => {
let s = serde_json::to_string(&e.data.unwrap()).unwrap();
println!("{}", s);
assert_eq!(s, "{\"TxExecutionError\":{\"InvalidTxError\":\"Expired\"}}");
}
Ok(_) => panic!("transaction should not succeed"),
}
});
}
Loading

0 comments on commit a5c92ee

Please sign in to comment.