From b856f94c1f31e13430f210ed4b34ba1acb04b7df Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 15 Mar 2021 14:35:14 -0700 Subject: [PATCH 1/9] Mempool transactions evaluated nonce-order --- src/core/mempool.rs | 248 ++++++++++++++++++++++++++++---------------- 1 file changed, 158 insertions(+), 90 deletions(-) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index fb491d3176..27712f7b0f 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -400,34 +400,67 @@ impl MemPoolDB { }) } + /// Gets the lowest ancestor block(where `height > curr_tip_height && height <= max_tip_height`) + /// that is in the mempool of the "max" block (identified by `max_tip_block_hash` and + /// `max_tip_consensus_hash`). If found, this function returns a `MemPoolWalkResult::Chainstate` + /// enum along with identifying information about the ancestor block: its height + /// (`next_height`), its consensus hash, its block hash, and the earliest timestamp of the + /// transactions in it that are also in the mempool. + /// + /// If the mempool has tips at a certain height (but not an ancestor of the "max" block), the + /// function returns a `MemPoolWalkResult::NoneAtHeight` enum with that height (`next_height`). + /// + /// If there is no block height that satisfies + /// `height > curr_tip_height && height <= max_tip_height`, if the height of the lowest + /// block found matches the curr_tip_height, or if the function is unable to find the ancestor + /// of the "max" block at `next_height`, the `MemPoolWalkResult::Done` enum is returned. + /// + /// # Parameters + /// - `curr_tip_height`: setting `curr_tip_height` to `None` is equivalent to searching for + /// blocks with `height >= 0`. Setting `curr_tip_height` to `Some(0)` is equivalent to + /// searching for blocks with `height > 0` instead. + /// - `max_{}`: these parameters identify the max (or highest) block in the search. The search + /// is inclusive of this block. The function will only return a block (through the Chainstate + /// enum) that is either an ancestor of this block or the block itself. fn walk( &self, chainstate: &mut StacksChainState, tip_consensus_hash: &ConsensusHash, tip_block_hash: &BlockHeaderHash, - tip_height: u64, + curr_tip_height: Option, + max_tip_consensus_hash: &ConsensusHash, + max_tip_block_hash: &BlockHeaderHash, + max_tip_height: u64, ) -> Result { - // Walk back to the next-highest - // ancestor of this tip, and see if we can include anything from there. - let next_height = match MemPoolDB::get_previous_block_height(&self.db, tip_height)? { + // Get the height of the block with the lowest height that is in the mempool + let next_height = match MemPoolDB::get_lowest_block_height_in_mempool( + &self.db, + max_tip_height, + curr_tip_height, + )? { Some(next_height) => next_height, None => { - debug!("Done scanning mempool: no transactions left"; "height" => tip_height); + debug!("Done scanning mempool: no transactions left"; "height" => curr_tip_height); return Ok(MemPoolWalkResult::Done); } }; - if next_height == 0 && tip_height == 0 { - // we're done -- tried every tx - debug!("Done scanning mempool: at height 0"); - return Ok(MemPoolWalkResult::Done); + + if let Some(h) = curr_tip_height { + if next_height == max_tip_height && h == max_tip_height { + // we're done -- tried every tx + debug!("Done scanning mempool: at height {}", max_tip_height); + return Ok(MemPoolWalkResult::Done); + } } let next_tips = MemPoolDB::get_chain_tips_at_height(&self.db, next_height)?; let ancestor_tip = { let headers_conn = chainstate.index_conn()?; - let index_block = - StacksBlockHeader::make_index_block_hash(tip_consensus_hash, tip_block_hash); + let index_block = StacksBlockHeader::make_index_block_hash( + max_tip_consensus_hash, + max_tip_block_hash, + ); match StacksChainState::get_index_tip_ancestor_conn( &headers_conn, &index_block, @@ -436,7 +469,8 @@ impl MemPoolDB { Some(tip_info) => tip_info, None => { // no ancestor at the height, this is a error because this shouldn't ever - // happen: a chain tip should have an ancestor at every height < than its own height + // happen: a chain tip should have an ancestor at every height < than its own + // height error!( "Done scanning mempool: no known ancestor of tip at height"; "height" => next_height, @@ -469,15 +503,15 @@ impl MemPoolDB { } if !found { - // no ancestor at height, try an earlier height + // no ancestor at height, try a later height debug!( "None of the available prior chain tips at {} is an ancestor of {}/{}", - next_height, tip_consensus_hash, tip_block_hash + next_height, max_tip_consensus_hash, max_tip_block_hash ); return Ok(MemPoolWalkResult::NoneAtHeight( ancestor_tip.consensus_hash, ancestor_bh, - tip_height - 1, + next_height, )); } @@ -509,7 +543,7 @@ impl MemPoolDB { /// Iterate over candidates in the mempool /// todo will be called once for each bundle of transactions at /// each ancestor chain tip from the given one, starting with the - /// most recent chain tip and working backwards until there are + /// least recent chain tip and working forwards until there are /// no more transactions to consider. Each batch of transactions /// passed to todo will be sorted in nonce order. pub fn iterate_candidates( @@ -524,95 +558,116 @@ impl MemPoolDB { F: FnMut(Vec) -> Result<(), E>, E: From + From, { - let (mut tip_consensus_hash, mut tip_block_hash, mut tip_height) = ( + let ( + mut curr_tip_consensus_hash, + max_tip_consensus_hash, + mut curr_tip_block_hash, + max_tip_block_hash, + max_tip_height, + mut curr_tip_height, + ) = ( + tip_consensus_hash.clone(), tip_consensus_hash.clone(), tip_block_hash.clone(), + tip_block_hash.clone(), tip_height, + None, ); debug!( - "Begin scanning transaction mempool at {}/{} height={}", - &tip_consensus_hash, &tip_block_hash, tip_height + "Begin scanning transaction mempool at {}/{} height={:?}", + &curr_tip_consensus_hash, &curr_tip_block_hash, curr_tip_height ); - let mut next_timestamp = - match MemPoolDB::get_next_timestamp(&self.db, &tip_consensus_hash, &tip_block_hash, 0)? - { - Some(ts) => ts, - None => loop { - // walk back to where the first transaction we can mine can be found - match self.walk(chainstate, &tip_consensus_hash, &tip_block_hash, tip_height)? { - MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - ) => { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_bhh; - tip_height = next_height; - break next_timestamp; - } - MemPoolWalkResult::NoneAtHeight( - next_consensus_hash, - next_block_hash, - next_height, - ) => { - if std::env::var("STACKS_MEMPOOL_BAD_BEHAVIOR") == Ok("1".into()) { - warn!( - "Stopping mempool walk because no mempool entries at height = {}", - next_height - 1 - ); - return Ok(()); - } else { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_hash; - tip_height = next_height; - } - } - MemPoolWalkResult::Done => { - return Ok(()); - } + let mut next_timestamp = loop { + // walk to where the first transaction we can mine can be found + match self.walk( + chainstate, + &curr_tip_consensus_hash, + &curr_tip_block_hash, + curr_tip_height, + &max_tip_consensus_hash, + &max_tip_block_hash, + max_tip_height, + )? { + MemPoolWalkResult::Chainstate( + next_consensus_hash, + next_block_bhh, + next_height, + next_timestamp, + ) => { + curr_tip_consensus_hash = next_consensus_hash; + curr_tip_block_hash = next_block_bhh; + curr_tip_height = Some(next_height); + break next_timestamp; + } + MemPoolWalkResult::NoneAtHeight( + next_consensus_hash, + next_block_hash, + next_height, + ) => { + if std::env::var("STACKS_MEMPOOL_BAD_BEHAVIOR") == Ok("1".into()) { + warn!( + "Stopping mempool walk because no mempool entries at height = {}", + next_height + ); + return Ok(()); + } else { + curr_tip_consensus_hash = next_consensus_hash; + curr_tip_block_hash = next_block_hash; + curr_tip_height = Some(next_height); } - }, - }; + } + MemPoolWalkResult::Done => { + return Ok(()); + } + } + }; loop { let available_txs = MemPoolDB::get_txs_at( &self.db, - &tip_consensus_hash, - &tip_block_hash, + &curr_tip_consensus_hash, + &curr_tip_block_hash, next_timestamp, )?; debug!( - "Have {} transactions at {}/{} height={} at or after {}", + "Have {} transactions at {}/{} height={:?} at or after {}", available_txs.len(), - &tip_consensus_hash, - &tip_block_hash, - tip_height, + &curr_tip_consensus_hash, + &curr_tip_block_hash, + curr_tip_height, next_timestamp ); todo(available_txs)?; next_timestamp = match MemPoolDB::get_next_timestamp( &self.db, - &tip_consensus_hash, - &tip_block_hash, - next_timestamp, + &curr_tip_consensus_hash, + &curr_tip_block_hash, + next_timestamp.clone(), )? { Some(ts) => ts, None => loop { // walk back - match self.walk(chainstate, &tip_consensus_hash, &tip_block_hash, tip_height)? { + match self.walk( + chainstate, + &curr_tip_consensus_hash, + &curr_tip_block_hash, + curr_tip_height, + &max_tip_consensus_hash, + &max_tip_block_hash, + max_tip_height, + )? { MemPoolWalkResult::Chainstate( next_consensus_hash, next_block_bhh, next_height, next_timestamp, ) => { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_bhh; - tip_height = next_height; + curr_tip_consensus_hash = next_consensus_hash; + curr_tip_block_hash = next_block_bhh; + curr_tip_height = Some(next_height); break next_timestamp; } MemPoolWalkResult::NoneAtHeight( @@ -623,13 +678,13 @@ impl MemPoolDB { if std::env::var("STACKS_MEMPOOL_BAD_BEHAVIOR") == Ok("1".into()) { warn!( "Stopping mempool walk because no mempool entries at height = {}", - next_height - 1 + next_height ); return Ok(()); } else { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_hash; - tip_height = next_height; + curr_tip_consensus_hash = next_consensus_hash; + curr_tip_block_hash = next_block_hash; + curr_tip_height = Some(next_height); } } MemPoolWalkResult::Done => { @@ -708,6 +763,23 @@ impl MemPoolDB { query_row(conn, sql, args) } + /// Given a chain tip, find the lowest block-height with height <= max_height and height > + /// curr_height in the mempool + pub fn get_lowest_block_height_in_mempool( + conn: &DBConn, + max_height: u64, + curr_height: Option, + ) -> Result, db_error> { + let sql = "SELECT height FROM mempool WHERE height <= ?1 AND height > ?2 \ + ORDER BY height ASC LIMIT 1"; + let curr_height_sql_arg = match curr_height { + None => -1, + Some(h) => u64_to_sql(h)?, + }; + let args: &[&dyn ToSql] = &[&u64_to_sql(max_height)?, &curr_height_sql_arg]; + query_row(conn, sql, args) + } + /// Get chain tip(s) at a given height that have transactions pub fn get_chain_tips_at_height( conn: &DBConn, @@ -1317,13 +1389,13 @@ mod tests { &TransactionPostConditionMode::Allow, ); - let blocks_to_broadcast_in = [&b_1, &b_2]; - let mut txs = [txs.pop().unwrap(), txs.pop().unwrap()]; + let blocks_to_broadcast_in = [&b_1, &b_2, &b_4]; + let mut txs = [txs.pop().unwrap(), txs.pop().unwrap(), txs.pop().unwrap()]; for tx in txs.iter_mut() { tx.set_tx_fee(123); } - for ix in 0..2 { + for ix in 0..3 { let mut mempool_tx = mempool.tx_begin().unwrap(); let block = &blocks_to_broadcast_in[ix]; @@ -1340,7 +1412,6 @@ mod tests { let txid = tx.txid(); let tx_bytes = tx.serialize_to_vec(); - let tx_fee = tx.get_tx_fee(); let height = 1 + ix as u64; @@ -1428,10 +1499,7 @@ mod tests { }, ) .unwrap(); - assert_eq!( - count_txs, 1, - "Mempool should find one transactions from b_3" - ); + assert_eq!(count_txs, 1, "Mempool should find one transaction from b_3"); let mut count_txs = 0; mempool @@ -1447,8 +1515,8 @@ mod tests { ) .unwrap(); assert_eq!( - count_txs, 1, - "Mempool should find one transactions from b_4" + count_txs, 2, + "Mempool should find two transactions from b_4" ); // let's test replace-across-fork while we're here. @@ -1513,7 +1581,7 @@ mod tests { .unwrap(); assert_eq!( count_txs, 1, - "After replace, mempool should find one transactions from b_2" + "After replace, mempool should find one transaction from b_2" ); let mut count_txs = 0; @@ -1531,7 +1599,7 @@ mod tests { .unwrap(); assert_eq!( count_txs, 1, - "After replace, mempool should find one transactions from b_5" + "After replace, mempool should find one transaction from b_5" ); let mut count_txs = 0; @@ -1548,8 +1616,8 @@ mod tests { ) .unwrap(); assert_eq!( - count_txs, 2, - "After replace, mempool should find *two* transactions from b_4" + count_txs, 3, + "After replace, mempool should find *three* transactions from b_4" ); } From 2e669abfc16b6fa210b4beb4cf518762ab3eda21 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 15 Mar 2021 15:07:00 -0700 Subject: [PATCH 2/9] removed duped code, updated debug statement, removed unnecessary clone --- src/core/mempool.rs | 201 +++++++++++++++++++++++++------------------- 1 file changed, 114 insertions(+), 87 deletions(-) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index 27712f7b0f..e925aaadfd 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -422,7 +422,7 @@ impl MemPoolDB { /// - `max_{}`: these parameters identify the max (or highest) block in the search. The search /// is inclusive of this block. The function will only return a block (through the Chainstate /// enum) that is either an ancestor of this block or the block itself. - fn walk( + fn walk_forwards( &self, chainstate: &mut StacksChainState, tip_consensus_hash: &ConsensusHash, @@ -539,49 +539,22 @@ impl MemPoolDB { )) } - /// - /// Iterate over candidates in the mempool - /// todo will be called once for each bundle of transactions at - /// each ancestor chain tip from the given one, starting with the - /// least recent chain tip and working forwards until there are - /// no more transactions to consider. Each batch of transactions - /// passed to todo will be sorted in nonce order. - pub fn iterate_candidates( + fn get_next_timestamp_at_next_block( &self, - tip_consensus_hash: &ConsensusHash, - tip_block_hash: &BlockHeaderHash, - tip_height: u64, chainstate: &mut StacksChainState, - mut todo: F, - ) -> Result<(), E> + mut curr_tip_consensus_hash: ConsensusHash, + mut curr_tip_block_hash: BlockHeaderHash, + mut curr_tip_height: Option, + max_tip_consensus_hash: &ConsensusHash, + max_tip_block_hash: &BlockHeaderHash, + max_tip_height: u64, + ) -> Result, E> where - F: FnMut(Vec) -> Result<(), E>, E: From + From, { - let ( - mut curr_tip_consensus_hash, - max_tip_consensus_hash, - mut curr_tip_block_hash, - max_tip_block_hash, - max_tip_height, - mut curr_tip_height, - ) = ( - tip_consensus_hash.clone(), - tip_consensus_hash.clone(), - tip_block_hash.clone(), - tip_block_hash.clone(), - tip_height, - None, - ); - - debug!( - "Begin scanning transaction mempool at {}/{} height={:?}", - &curr_tip_consensus_hash, &curr_tip_block_hash, curr_tip_height - ); - - let mut next_timestamp = loop { + loop { // walk to where the first transaction we can mine can be found - match self.walk( + match self.walk_forwards( chainstate, &curr_tip_consensus_hash, &curr_tip_block_hash, @@ -596,10 +569,12 @@ impl MemPoolDB { next_height, next_timestamp, ) => { - curr_tip_consensus_hash = next_consensus_hash; - curr_tip_block_hash = next_block_bhh; - curr_tip_height = Some(next_height); - break next_timestamp; + return Ok(Some(MemPoolWalkResult::Chainstate( + next_consensus_hash, + next_block_bhh, + next_height, + next_timestamp, + ))); } MemPoolWalkResult::NoneAtHeight( next_consensus_hash, @@ -611,7 +586,7 @@ impl MemPoolDB { "Stopping mempool walk because no mempool entries at height = {}", next_height ); - return Ok(()); + return Ok(None); } else { curr_tip_consensus_hash = next_consensus_hash; curr_tip_block_hash = next_block_hash; @@ -619,9 +594,78 @@ impl MemPoolDB { } } MemPoolWalkResult::Done => { - return Ok(()); + return Ok(None); } } + } + } + + /// + /// Iterate over candidates in the mempool + /// todo will be called once for each bundle of transactions at + /// each ancestor chain tip from the given one, starting with the + /// least recent chain tip and working forwards until there are + /// no more transactions to consider. Each batch of transactions + /// passed to todo will be sorted in nonce order. + pub fn iterate_candidates( + &self, + tip_consensus_hash: &ConsensusHash, + tip_block_hash: &BlockHeaderHash, + tip_height: u64, + chainstate: &mut StacksChainState, + mut todo: F, + ) -> Result<(), E> + where + F: FnMut(Vec) -> Result<(), E>, + E: From + From, + { + let ( + mut curr_tip_consensus_hash, + max_tip_consensus_hash, + mut curr_tip_block_hash, + max_tip_block_hash, + max_tip_height, + mut curr_tip_height, + ) = ( + tip_consensus_hash.clone(), + tip_consensus_hash.clone(), + tip_block_hash.clone(), + tip_block_hash.clone(), + tip_height, + None, + ); + + debug!( + "Begin scanning transaction mempool with max block {}/{} height={:?}", + &max_tip_consensus_hash, &max_tip_block_hash, max_tip_height + ); + + let mut next_timestamp = match self.get_next_timestamp_at_next_block::( + chainstate, + curr_tip_consensus_hash, + curr_tip_block_hash, + curr_tip_height, + &max_tip_consensus_hash, + &max_tip_block_hash, + max_tip_height, + )? { + None => { + return Ok(()); + } + Some(MemPoolWalkResult::Chainstate( + next_consensus_hash, + next_block_bhh, + next_height, + next_timestamp, + )) => { + curr_tip_consensus_hash = next_consensus_hash; + curr_tip_block_hash = next_block_bhh; + curr_tip_height = Some(next_height); + next_timestamp + } + _ => { + unreachable!("Function only returns MemPoolWalkResult of type Chainstate") + } }; loop { @@ -645,51 +689,34 @@ impl MemPoolDB { &self.db, &curr_tip_consensus_hash, &curr_tip_block_hash, - next_timestamp.clone(), + next_timestamp, )? { Some(ts) => ts, - None => loop { - // walk back - match self.walk( - chainstate, - &curr_tip_consensus_hash, - &curr_tip_block_hash, - curr_tip_height, - &max_tip_consensus_hash, - &max_tip_block_hash, - max_tip_height, - )? { - MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - ) => { - curr_tip_consensus_hash = next_consensus_hash; - curr_tip_block_hash = next_block_bhh; - curr_tip_height = Some(next_height); - break next_timestamp; - } - MemPoolWalkResult::NoneAtHeight( - next_consensus_hash, - next_block_hash, - next_height, - ) => { - if std::env::var("STACKS_MEMPOOL_BAD_BEHAVIOR") == Ok("1".into()) { - warn!( - "Stopping mempool walk because no mempool entries at height = {}", - next_height - ); - return Ok(()); - } else { - curr_tip_consensus_hash = next_consensus_hash; - curr_tip_block_hash = next_block_hash; - curr_tip_height = Some(next_height); - } - } - MemPoolWalkResult::Done => { - return Ok(()); - } + None => match self.get_next_timestamp_at_next_block::( + chainstate, + curr_tip_consensus_hash, + curr_tip_block_hash, + curr_tip_height, + &max_tip_consensus_hash, + &max_tip_block_hash, + max_tip_height, + )? { + None => { + return Ok(()); + } + Some(MemPoolWalkResult::Chainstate( + next_consensus_hash, + next_block_bhh, + next_height, + next_timestamp, + )) => { + curr_tip_consensus_hash = next_consensus_hash; + curr_tip_block_hash = next_block_bhh; + curr_tip_height = Some(next_height); + next_timestamp + } + _ => { + unreachable!("Function only returns MemPoolWalkResult of type Chainstate") } }, }; From 40a6584755500f47d892977b7716875137c88db4 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Wed, 17 Mar 2021 14:45:13 -0700 Subject: [PATCH 3/9] Changed min tip height for mempool walk, added neon integration test --- src/core/mempool.rs | 8 +- testnet/stacks-node/src/tests/integrations.rs | 166 ++++++++++++++++++ 2 files changed, 173 insertions(+), 1 deletion(-) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index e925aaadfd..87cf2764ac 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -619,6 +619,12 @@ impl MemPoolDB { F: FnMut(Vec) -> Result<(), E>, E: From + From, { + // Want to consider transactions at height (`tip_height - MEMPOOL_MAX_TRANSACTION_AGE`) + let min_tip_height = match tip_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE) { + None => None, + Some(h) if h == 0 => None, + Some(h) => Some(h - 1), + }; let ( mut curr_tip_consensus_hash, max_tip_consensus_hash, @@ -632,7 +638,7 @@ impl MemPoolDB { tip_block_hash.clone(), tip_block_hash.clone(), tip_height, - None, + min_tip_height, ); debug!( diff --git a/testnet/stacks-node/src/tests/integrations.rs b/testnet/stacks-node/src/tests/integrations.rs index c311430bc3..c86f7da0fc 100644 --- a/testnet/stacks-node/src/tests/integrations.rs +++ b/testnet/stacks-node/src/tests/integrations.rs @@ -1025,6 +1025,172 @@ fn contract_stx_transfer() { run_loop.start(num_rounds).unwrap(); } +#[test] +fn mine_transactions_out_of_order() { + let mut conf = super::new_test_conf(); + + let sk = StacksPrivateKey::from_hex(SK_3).unwrap(); + let addr = to_addr(&sk); + conf.burnchain.commit_anchor_block_within = 5000; + conf.add_initial_balance(addr.to_string(), 100000); + + let num_rounds = 6; + let mut run_loop = RunLoop::new(conf); + + run_loop + .callbacks + .on_new_tenure(|round, _burnchain_tip, chain_tip, tenure| { + let mut chainstate_copy = tenure.open_chainstate(); + + let sk = StacksPrivateKey::from_hex(SK_3).unwrap(); + let header_hash = chain_tip.block.block_hash(); + let consensus_hash = chain_tip.metadata.consensus_hash; + + let contract_identifier = QualifiedContractIdentifier::parse(&format!( + "{}.{}", + to_addr(&StacksPrivateKey::from_hex(SK_1).unwrap()).to_string(), + "faucet" + )) + .unwrap(); + + if round == 1 { + // block-height = 2 + let xfer_to_contract = + make_stacks_transfer(&sk, 1, 0, &contract_identifier.into(), 1000); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + xfer_to_contract, + ) + .unwrap(); + } else if round == 2 { + // block-height > 2 + let publish_tx = make_contract_publish(&sk, 2, 0, "faucet", FAUCET_CONTRACT); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + publish_tx, + ) + .unwrap(); + } else if round == 3 { + let xfer_to_contract = + make_stacks_transfer(&sk, 3, 0, &contract_identifier.into(), 1000); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + xfer_to_contract, + ) + .unwrap(); + } else if round == 4 { + let xfer_to_contract = + make_stacks_transfer(&sk, 0, 0, &contract_identifier.into(), 1000); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + xfer_to_contract, + ) + .unwrap(); + } + + return; + }); + + run_loop.callbacks.on_new_stacks_chain_state( + |round, _burnchain_tip, chain_tip, chain_state, burn_dbconn| { + let contract_identifier = QualifiedContractIdentifier::parse(&format!( + "{}.{}", + to_addr(&StacksPrivateKey::from_hex(SK_1).unwrap()).to_string(), + "faucet" + )) + .unwrap(); + + match round { + 1 => { + assert_eq!(chain_tip.metadata.block_height, 2); + assert_eq!(chain_tip.block.txs.len(), 1); + } + 2 => { + assert_eq!(chain_tip.metadata.block_height, 3); + assert_eq!(chain_tip.block.txs.len(), 1); + } + 3 => { + assert_eq!(chain_tip.metadata.block_height, 4); + assert_eq!(chain_tip.block.txs.len(), 1); + } + 4 => { + assert_eq!(chain_tip.metadata.block_height, 5); + assert_eq!(chain_tip.block.txs.len(), 2); + + // check that 1000 stx _was_ transfered to the contract principal + let curr_tip = ( + chain_tip.metadata.consensus_hash.clone(), + chain_tip.metadata.anchored_header.block_hash(), + ); + assert_eq!( + chain_state + .with_read_only_clarity_tx( + burn_dbconn, + &StacksBlockHeader::make_index_block_hash(&curr_tip.0, &curr_tip.1), + |conn| { + conn.with_clarity_db_readonly(|db| { + db.get_account_stx_balance( + &contract_identifier.clone().into(), + ) + .amount_unlocked + }) + } + ) + .unwrap(), + 1000 + ); + } + 5 => { + assert_eq!(chain_tip.metadata.block_height, 6); + assert_eq!(chain_tip.block.txs.len(), 4); + + // check that 3000 stx _was_ transferred to the contract principal total + let curr_tip = ( + chain_tip.metadata.consensus_hash.clone(), + chain_tip.metadata.anchored_header.block_hash(), + ); + assert_eq!( + chain_state + .with_read_only_clarity_tx( + burn_dbconn, + &StacksBlockHeader::make_index_block_hash(&curr_tip.0, &curr_tip.1), + |conn| { + conn.with_clarity_db_readonly(|db| { + db.get_account_stx_balance( + &contract_identifier.clone().into(), + ) + .amount_unlocked + }) + } + ) + .unwrap(), + 3000 + ); + } + _ => {} + } + }, + ); + + run_loop.start(num_rounds).unwrap(); +} + /// Test mining a smart contract twice (in non-sequential blocks) /// this can happen in the testnet leader if they get "behind" /// the burnchain and a previously mined block doesn't get included From 60b1a21de039f6c0c5b667576d07f6ef6201f856 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Thu, 1 Apr 2021 12:44:43 -0400 Subject: [PATCH 4/9] Fix replace-across-fork-logic --- src/core/mempool.rs | 88 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 19 deletions(-) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index 87cf2764ac..9754c1417f 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -255,31 +255,36 @@ impl<'a> MemPoolTx<'a> { self.tx.commit().map_err(db_error::SqliteError) } - fn is_block_in_fork( + fn are_blocks_in_same_fork( &mut self, chainstate: &mut StacksChainState, - check_consensus_hash: &ConsensusHash, - check_stacks_block: &BlockHeaderHash, - cur_consensus_hash: &ConsensusHash, - cur_stacks_block: &BlockHeaderHash, + first_consensus_hash: &ConsensusHash, + first_stacks_block: &BlockHeaderHash, + second_consensus_hash: &ConsensusHash, + second_stacks_block: &BlockHeaderHash, ) -> Result { - let admitter_block = - StacksBlockHeader::make_index_block_hash(cur_consensus_hash, cur_stacks_block); - let index_block = - StacksBlockHeader::make_index_block_hash(check_consensus_hash, check_stacks_block); + let first_block = + StacksBlockHeader::make_index_block_hash(first_consensus_hash, first_stacks_block); + let second_block = + StacksBlockHeader::make_index_block_hash(second_consensus_hash, second_stacks_block); // short circuit equality - if admitter_block == index_block { + if second_block == first_block { return Ok(true); } - let height_result = chainstate - .with_clarity_marf(|marf| marf.get_block_height_of(&index_block, &admitter_block)); - match height_result { - Ok(x) => { - eprintln!("{} from {} => {:?}", &index_block, &admitter_block, x); - Ok(x.is_some()) - } - Err(x) => Err(db_error::IndexError(x)), + let height_of_first_with_second_tip = chainstate + .with_clarity_marf(|marf| marf.get_block_height_of(&first_block, &second_block)); + let height_of_second_with_first_tip = chainstate + .with_clarity_marf(|marf| marf.get_block_height_of(&second_block, &first_block)); + + match ( + height_of_first_with_second_tip, + height_of_second_with_first_tip, + ) { + (Ok(None), Ok(None)) => Ok(false), + (Ok(_), Ok(_)) => Ok(true), + (Err(x), _) => Err(db_error::IndexError(x)), + (_, Err(x)) => Err(db_error::IndexError(x)), } } } @@ -951,7 +956,7 @@ impl MemPoolDB { // is this a replace-by-fee ? replace_reason = MemPoolDropReason::REPLACE_BY_FEE; true - } else if !tx.is_block_in_fork( + } else if !tx.are_blocks_in_same_fork( chainstate, &prior_tx.consensus_hash, &prior_tx.block_header_hash, @@ -1553,6 +1558,51 @@ mod tests { ); // let's test replace-across-fork while we're here. + // first try to replace a tx in b_2 in b_1 - should fail because they are in the same fork + let mut mempool_tx = mempool.tx_begin().unwrap(); + let block = &b_1; + let tx = &txs[1]; + let origin_address = StacksAddress { + version: 22, + bytes: Hash160::from_data(&[0; 32]), + }; + let sponsor_address = StacksAddress { + version: 22, + bytes: Hash160::from_data(&[1; 32]), + }; + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let tx_fee = tx.get_tx_fee(); + + let height = 3; + let origin_nonce = 1; + let sponsor_nonce = 1; + + // make sure that we already have the transaction we're testing for replace-across-fork + assert!(MemPoolDB::db_has_tx(&mempool_tx, &txid).unwrap()); + + assert!(MemPoolDB::try_add_tx( + &mut mempool_tx, + &mut chainstate, + &block.0, + &block.1, + txid, + tx_bytes, + tx_fee, + height, + &origin_address, + origin_nonce, + &sponsor_address, + sponsor_nonce, + None, + ) + .is_err()); + + assert!(MemPoolDB::db_has_tx(&mempool_tx, &txid).unwrap()); + mempool_tx.commit().unwrap(); + + // now try replace-across-fork from b_2 to b_4 let mut mempool_tx = mempool.tx_begin().unwrap(); let block = &b_4; let tx = &txs[1]; From 69b720b56bdb1b105d2b511fe89251b1446917d8 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Thu, 1 Apr 2021 16:46:29 -0400 Subject: [PATCH 5/9] fix: changed mempool walk logic, fixing #2393 and #2394 --- src/chainstate/stacks/db/headers.rs | 17 - src/chainstate/stacks/db/mod.rs | 4 - src/core/mempool.rs | 532 ++++++++-------------------- src/net/relay.rs | 2 +- src/util/db.rs | 11 + 5 files changed, 154 insertions(+), 412 deletions(-) diff --git a/src/chainstate/stacks/db/headers.rs b/src/chainstate/stacks/db/headers.rs index 5cd79932a2..e5c4158445 100644 --- a/src/chainstate/stacks/db/headers.rs +++ b/src/chainstate/stacks/db/headers.rs @@ -279,23 +279,6 @@ impl StacksChainState { } } - /// Get an ancestor block header given an index hash - pub fn get_index_tip_ancestor_conn( - conn: &StacksDBConn, - tip_index_hash: &StacksBlockId, - height: u64, - ) -> Result, Error> { - match conn - .get_ancestor_block_hash(height, tip_index_hash) - .map_err(Error::DBError)? - { - Some(bhh) => { - StacksChainState::get_stacks_block_header_info_by_index_block_hash(conn, &bhh) - } - None => Ok(None), - } - } - /// Get a segment of headers from the canonical chain pub fn get_ancestors_headers( conn: &Connection, diff --git a/src/chainstate/stacks/db/mod.rs b/src/chainstate/stacks/db/mod.rs index 5155d69321..220a157daf 100644 --- a/src/chainstate/stacks/db/mod.rs +++ b/src/chainstate/stacks/db/mod.rs @@ -1424,10 +1424,6 @@ impl StacksChainState { Ok(StacksDBTx::new(&mut self.state_index, ())) } - pub fn index_conn<'a>(&'a self) -> Result, Error> { - Ok(StacksDBConn::new(&self.state_index, ())) - } - /// Begin a transaction against the underlying DB /// Does not create a Clarity instance, and does not affect the MARF. pub fn db_tx_begin<'a>(&'a mut self) -> Result, Error> { diff --git a/src/core/mempool.rs b/src/core/mempool.rs index 9754c1417f..7755be029e 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -34,7 +34,7 @@ use net::StacksMessageCodec; use chainstate::burn::BlockHeaderHash; use chainstate::stacks::{ db::blocks::MemPoolRejection, db::StacksChainState, index::Error as MarfError, - Error as ChainstateError, StacksAddress, StacksBlockHeader, StacksTransaction, + Error as ChainstateError, StacksAddress, StacksBlockHeader, StacksBlockId, StacksTransaction, }; use std::fs; use std::io::Read; @@ -60,6 +60,7 @@ use monitoring::increment_stx_mempool_gc; use vm::types::PrincipalData; use crate::monitoring; +use chainstate::stacks::db::StacksHeaderInfo; // maximum number of confirmations a transaction can have before it's garbage-collected pub const MEMPOOL_MAX_TRANSACTION_AGE: u64 = 256; @@ -405,213 +406,15 @@ impl MemPoolDB { }) } - /// Gets the lowest ancestor block(where `height > curr_tip_height && height <= max_tip_height`) - /// that is in the mempool of the "max" block (identified by `max_tip_block_hash` and - /// `max_tip_consensus_hash`). If found, this function returns a `MemPoolWalkResult::Chainstate` - /// enum along with identifying information about the ancestor block: its height - /// (`next_height`), its consensus hash, its block hash, and the earliest timestamp of the - /// transactions in it that are also in the mempool. - /// - /// If the mempool has tips at a certain height (but not an ancestor of the "max" block), the - /// function returns a `MemPoolWalkResult::NoneAtHeight` enum with that height (`next_height`). - /// - /// If there is no block height that satisfies - /// `height > curr_tip_height && height <= max_tip_height`, if the height of the lowest - /// block found matches the curr_tip_height, or if the function is unable to find the ancestor - /// of the "max" block at `next_height`, the `MemPoolWalkResult::Done` enum is returned. - /// - /// # Parameters - /// - `curr_tip_height`: setting `curr_tip_height` to `None` is equivalent to searching for - /// blocks with `height >= 0`. Setting `curr_tip_height` to `Some(0)` is equivalent to - /// searching for blocks with `height > 0` instead. - /// - `max_{}`: these parameters identify the max (or highest) block in the search. The search - /// is inclusive of this block. The function will only return a block (through the Chainstate - /// enum) that is either an ancestor of this block or the block itself. - fn walk_forwards( - &self, - chainstate: &mut StacksChainState, - tip_consensus_hash: &ConsensusHash, - tip_block_hash: &BlockHeaderHash, - curr_tip_height: Option, - max_tip_consensus_hash: &ConsensusHash, - max_tip_block_hash: &BlockHeaderHash, - max_tip_height: u64, - ) -> Result { - // Get the height of the block with the lowest height that is in the mempool - let next_height = match MemPoolDB::get_lowest_block_height_in_mempool( - &self.db, - max_tip_height, - curr_tip_height, - )? { - Some(next_height) => next_height, - None => { - debug!("Done scanning mempool: no transactions left"; "height" => curr_tip_height); - return Ok(MemPoolWalkResult::Done); - } - }; - - if let Some(h) = curr_tip_height { - if next_height == max_tip_height && h == max_tip_height { - // we're done -- tried every tx - debug!("Done scanning mempool: at height {}", max_tip_height); - return Ok(MemPoolWalkResult::Done); - } - } - - let next_tips = MemPoolDB::get_chain_tips_at_height(&self.db, next_height)?; - - let ancestor_tip = { - let headers_conn = chainstate.index_conn()?; - let index_block = StacksBlockHeader::make_index_block_hash( - max_tip_consensus_hash, - max_tip_block_hash, - ); - match StacksChainState::get_index_tip_ancestor_conn( - &headers_conn, - &index_block, - next_height, - )? { - Some(tip_info) => tip_info, - None => { - // no ancestor at the height, this is a error because this shouldn't ever - // happen: a chain tip should have an ancestor at every height < than its own - // height - error!( - "Done scanning mempool: no known ancestor of tip at height"; - "height" => next_height, - "tip_consensus_hash" => %tip_consensus_hash, - "tip_block_hash" => %tip_block_hash, - "tip_index_hash" => %StacksBlockHeader::make_index_block_hash( - tip_consensus_hash, - tip_block_hash - ), - ); - return Ok(MemPoolWalkResult::Done); - } - } - }; - - // find out which tip is the ancestor tip - let mut found = false; - let mut next_tip_consensus_hash = tip_consensus_hash.clone(); - let mut next_tip_block_hash = tip_block_hash.clone(); - - let ancestor_bh = ancestor_tip.anchored_header.block_hash(); - - for (consensus_hash, block_bhh) in next_tips.into_iter() { - if ancestor_tip.consensus_hash == consensus_hash && ancestor_bh == block_bhh { - found = true; - next_tip_consensus_hash = consensus_hash; - next_tip_block_hash = block_bhh; - break; - } - } - - if !found { - // no ancestor at height, try a later height - debug!( - "None of the available prior chain tips at {} is an ancestor of {}/{}", - next_height, max_tip_consensus_hash, max_tip_block_hash - ); - return Ok(MemPoolWalkResult::NoneAtHeight( - ancestor_tip.consensus_hash, - ancestor_bh, - next_height, - )); - } - - let next_timestamp = match MemPoolDB::get_next_timestamp( - &self.db, - &next_tip_consensus_hash, - &next_tip_block_hash, - 0, - )? { - Some(ts) => ts, - None => { - unreachable!("No transactions at a chain tip that exists"); - } - }; - - debug!( - "Will start scaning mempool at {}/{} height={} ts={}", - &next_tip_consensus_hash, &next_tip_block_hash, next_height, next_timestamp - ); - Ok(MemPoolWalkResult::Chainstate( - next_tip_consensus_hash, - next_tip_block_hash, - next_height, - next_timestamp, - )) - } - - fn get_next_timestamp_at_next_block( - &self, - chainstate: &mut StacksChainState, - mut curr_tip_consensus_hash: ConsensusHash, - mut curr_tip_block_hash: BlockHeaderHash, - mut curr_tip_height: Option, - max_tip_consensus_hash: &ConsensusHash, - max_tip_block_hash: &BlockHeaderHash, - max_tip_height: u64, - ) -> Result, E> - where - E: From + From, - { - loop { - // walk to where the first transaction we can mine can be found - match self.walk_forwards( - chainstate, - &curr_tip_consensus_hash, - &curr_tip_block_hash, - curr_tip_height, - &max_tip_consensus_hash, - &max_tip_block_hash, - max_tip_height, - )? { - MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - ) => { - return Ok(Some(MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - ))); - } - MemPoolWalkResult::NoneAtHeight( - next_consensus_hash, - next_block_hash, - next_height, - ) => { - if std::env::var("STACKS_MEMPOOL_BAD_BEHAVIOR") == Ok("1".into()) { - warn!( - "Stopping mempool walk because no mempool entries at height = {}", - next_height - ); - return Ok(None); - } else { - curr_tip_consensus_hash = next_consensus_hash; - curr_tip_block_hash = next_block_hash; - curr_tip_height = Some(next_height); - } - } - MemPoolWalkResult::Done => { - return Ok(None); - } - } - } - } - /// /// Iterate over candidates in the mempool /// todo will be called once for each bundle of transactions at - /// each ancestor chain tip from the given one, starting with the - /// least recent chain tip and working forwards until there are + /// each nonce, starting with the smallest nonce and going higher until there are /// no more transactions to consider. Each batch of transactions - /// passed to todo will be sorted in nonce order. + /// passed to todo will be sorted by sponsor_nonce. + /// + /// Consider transactions across all forks where the transactions have + /// height >= max(0, tip_height - MEMPOOL_MAX_TRANSACTION_AGE) and height <= tip_height. pub fn iterate_candidates( &self, tip_consensus_hash: &ConsensusHash, @@ -624,112 +427,62 @@ impl MemPoolDB { F: FnMut(Vec) -> Result<(), E>, E: From + From, { - // Want to consider transactions at height (`tip_height - MEMPOOL_MAX_TRANSACTION_AGE`) - let min_tip_height = match tip_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE) { + // Want to consider transactions with + // height > max(0, `tip_height - MEMPOOL_MAX_TRANSACTION_AGE`) - 1 + let min_height = match tip_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE) { None => None, Some(h) if h == 0 => None, Some(h) => Some(h - 1), }; - let ( - mut curr_tip_consensus_hash, - max_tip_consensus_hash, - mut curr_tip_block_hash, - max_tip_block_hash, - max_tip_height, - mut curr_tip_height, - ) = ( - tip_consensus_hash.clone(), - tip_consensus_hash.clone(), - tip_block_hash.clone(), - tip_block_hash.clone(), - tip_height, - min_tip_height, - ); - debug!( - "Begin scanning transaction mempool with max block {}/{} height={:?}", - &max_tip_consensus_hash, &max_tip_block_hash, max_tip_height - ); - - let mut next_timestamp = match self.get_next_timestamp_at_next_block::( - chainstate, - curr_tip_consensus_hash, - curr_tip_block_hash, - curr_tip_height, - &max_tip_consensus_hash, - &max_tip_block_hash, - max_tip_height, - )? { - None => { - return Ok(()); - } - Some(MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - )) => { - curr_tip_consensus_hash = next_consensus_hash; - curr_tip_block_hash = next_block_bhh; - curr_tip_height = Some(next_height); - next_timestamp - } - _ => { - unreachable!("Function only returns MemPoolWalkResult of type Chainstate") - } - }; + let (mut next_nonce, mut next_timestamp) = + match MemPoolDB::get_next_nonce(&self.db, min_height, tip_height, None)? { + None => { + return Ok(()); + } + Some((nonce, ts)) => (nonce, ts), + }; loop { - let available_txs = MemPoolDB::get_txs_at( + let available_txs = MemPoolDB::get_txs_at_nonce_and_timestamp( &self.db, - &curr_tip_consensus_hash, - &curr_tip_block_hash, + min_height, + tip_height, + next_nonce, next_timestamp, )?; debug!( - "Have {} transactions at {}/{} height={:?} at or after {}", + "Have {} transactions at nonce={} at or after {}", available_txs.len(), - &curr_tip_consensus_hash, - &curr_tip_block_hash, - curr_tip_height, + next_nonce, next_timestamp ); todo(available_txs)?; - next_timestamp = match MemPoolDB::get_next_timestamp( + next_timestamp = match MemPoolDB::get_next_timestamp_at_nonce( &self.db, - &curr_tip_consensus_hash, - &curr_tip_block_hash, + min_height, + tip_height, next_timestamp, + next_nonce, )? { Some(ts) => ts, - None => match self.get_next_timestamp_at_next_block::( - chainstate, - curr_tip_consensus_hash, - curr_tip_block_hash, - curr_tip_height, - &max_tip_consensus_hash, - &max_tip_block_hash, - max_tip_height, - )? { - None => { - return Ok(()); - } - Some(MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - )) => { - curr_tip_consensus_hash = next_consensus_hash; - curr_tip_block_hash = next_block_bhh; - curr_tip_height = Some(next_height); - next_timestamp - } - _ => { - unreachable!("Function only returns MemPoolWalkResult of type Chainstate") + None => { + match MemPoolDB::get_next_nonce( + &self.db, + min_height, + tip_height, + Some(next_nonce), + )? { + None => { + return Ok(()); + } + Some((nonce, ts)) => { + next_nonce = nonce; + ts + } } - }, + } }; } } @@ -768,18 +521,91 @@ impl MemPoolDB { Ok(rows) } - /// Get the next timestamp after this one that occurs in this chain tip. - pub fn get_next_timestamp( + /// Get all transactions at a specific block + #[cfg(test)] + pub fn get_num_tx_at_block( conn: &DBConn, consensus_hash: &ConsensusHash, block_header_hash: &BlockHeaderHash, + ) -> Result { + let sql = "SELECT * FROM mempool WHERE consensus_hash = ?1 AND block_header_hash = ?2"; + let args: &[&dyn ToSql] = &[consensus_hash, block_header_hash]; + let rows = query_rows::(conn, &sql, args)?; + Ok(rows.len()) + } + + /// Get the next nonce/timestamp after this nonce that occurs in the height range. + pub fn get_next_nonce( + conn: &DBConn, + min_height: Option, + max_height: u64, + nonce: Option, + ) -> Result, db_error> { + let nonce = match nonce { + None => -1, + Some(n) => u64_to_sql(n)?, + }; + let min_height_sql_arg = match min_height { + None => -1, + Some(h) => u64_to_sql(h)?, + }; + let sql = "SELECT origin_nonce, accept_time FROM mempool WHERE origin_nonce > ?1 AND \ + height > ?2 AND height <= ?3 ORDER BY origin_nonce, accept_time ASC LIMIT 1"; + + let args: &[&dyn ToSql] = &[&nonce, &min_height_sql_arg, &u64_to_sql(max_height)?]; + query_row(conn, sql, args) + } + + /// Get the next timestamp after this one that occurs with this nonce in the height range. + pub fn get_next_timestamp_at_nonce( + conn: &DBConn, + min_height: Option, + max_height: u64, timestamp: u64, - ) -> Result, db_error> { - let sql = "SELECT accept_time FROM mempool WHERE accept_time > ?1 AND consensus_hash = ?2 AND block_header_hash = ?3 ORDER BY accept_time ASC LIMIT 1"; - let args: &[&dyn ToSql] = &[&u64_to_sql(timestamp)?, consensus_hash, block_header_hash]; + nonce: u64, + ) -> Result, db_error> { + let min_height_sql_arg = match min_height { + None => -1, + Some(h) => u64_to_sql(h)?, + }; + + let sql = "SELECT accept_time FROM mempool WHERE accept_time > ?1 AND \ + origin_nonce = ?2 AND height > ?3 AND height <= ?4 ORDER BY accept_time ASC LIMIT 1"; + let args: &[&dyn ToSql] = &[ + &u64_to_sql(timestamp)?, + &u64_to_sql(nonce)?, + &min_height_sql_arg, + &u64_to_sql(max_height)?, + ]; query_row(conn, sql, args) } + /// Get all transactions at a particular nonce and timestamp on a given chain tip. + /// Order them by sponsor nonce. + pub fn get_txs_at_nonce_and_timestamp( + conn: &DBConn, + min_height: Option, + max_height: u64, + nonce: u64, + timestamp: u64, + ) -> Result, db_error> { + let min_height_sql_arg = match min_height { + None => -1, + Some(h) => u64_to_sql(h)?, + }; + + let sql = "SELECT * FROM mempool WHERE origin_nonce = ?1 AND accept_time = ?2 AND \ + height > ?3 AND height <= ?4 ORDER BY sponsor_nonce ASC"; + let args: &[&dyn ToSql] = &[ + &u64_to_sql(nonce)?, + &u64_to_sql(timestamp)?, + &min_height_sql_arg, + &u64_to_sql(max_height)?, + ]; + let rows = query_rows::(conn, &sql, args)?; + Ok(rows) + } + /// Get all transactions at a particular timestamp on a given chain tip. /// Order them by origin nonce. pub fn get_txs_at( @@ -801,46 +627,6 @@ impl MemPoolDB { query_row(conn, sql, args) } - /// Given a chain tip, find the lowest block-height with height <= max_height and height > - /// curr_height in the mempool - pub fn get_lowest_block_height_in_mempool( - conn: &DBConn, - max_height: u64, - curr_height: Option, - ) -> Result, db_error> { - let sql = "SELECT height FROM mempool WHERE height <= ?1 AND height > ?2 \ - ORDER BY height ASC LIMIT 1"; - let curr_height_sql_arg = match curr_height { - None => -1, - Some(h) => u64_to_sql(h)?, - }; - let args: &[&dyn ToSql] = &[&u64_to_sql(max_height)?, &curr_height_sql_arg]; - query_row(conn, sql, args) - } - - /// Get chain tip(s) at a given height that have transactions - pub fn get_chain_tips_at_height( - conn: &DBConn, - height: u64, - ) -> Result, db_error> { - let sql = "SELECT consensus_hash,block_header_hash FROM mempool WHERE height = ?1"; - let args: &[&dyn ToSql] = &[&u64_to_sql(height)?]; - - let mut stmt = conn.prepare(sql).map_err(db_error::SqliteError)?; - - let mut rows = stmt.query(args).map_err(db_error::SqliteError)?; - - // gather - let mut tips = vec![]; - while let Some(row) = rows.next().map_err(|e| db_error::SqliteError(e))? { - let consensus_hash = ConsensusHash::from_column(&row, "consensus_hash")?; - let block_hash = BlockHeaderHash::from_column(&row, "block_header_hash")?; - tips.push((consensus_hash, block_hash)); - } - - Ok(tips) - } - /// Get a number of transactions after a given timestamp on a given chain tip. pub fn get_txs_after( conn: &DBConn, @@ -1520,8 +1306,8 @@ mod tests { ) .unwrap(); assert_eq!( - count_txs, 2, - "Mempool should find two transactions from b_5" + count_txs, 3, + "Mempool should find three transactions from b_5" ); let mut count_txs = 0; @@ -1537,7 +1323,10 @@ mod tests { }, ) .unwrap(); - assert_eq!(count_txs, 1, "Mempool should find one transaction from b_3"); + assert_eq!( + count_txs, 2, + "Mempool should find two transactions from b_3" + ); let mut count_txs = 0; mempool @@ -1553,8 +1342,8 @@ mod tests { ) .unwrap(); assert_eq!( - count_txs, 2, - "Mempool should find two transactions from b_4" + count_txs, 3, + "Mempool should find three transactions from b_4" ); // let's test replace-across-fork while we're here. @@ -1603,6 +1392,15 @@ mod tests { mempool_tx.commit().unwrap(); // now try replace-across-fork from b_2 to b_4 + // check that the number of transactions at b_2 and b_4 starts at 1 each + assert_eq!( + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_4.0, &b_4.1).unwrap(), + 1 + ); + assert_eq!( + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_2.0, &b_2.1).unwrap(), + 1 + ); let mut mempool_tx = mempool.tx_begin().unwrap(); let block = &b_4; let tx = &txs[1]; @@ -1647,60 +1445,14 @@ mod tests { mempool_tx.commit().unwrap(); - // after replace-across-fork, tx[1] should have moved from the b_2->b_5 fork - // to b_4 - let mut count_txs = 0; - mempool - .iterate_candidates::<_, ChainstateError>( - &b_2.0, - &b_2.1, - 2, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) - .unwrap(); + // after replace-across-fork, tx[1] should have moved from the b_2->b_5 fork to b_4 assert_eq!( - count_txs, 1, - "After replace, mempool should find one transaction from b_2" + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_4.0, &b_4.1).unwrap(), + 2 ); - - let mut count_txs = 0; - mempool - .iterate_candidates::<_, ChainstateError>( - &b_5.0, - &b_5.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) - .unwrap(); assert_eq!( - count_txs, 1, - "After replace, mempool should find one transaction from b_5" - ); - - let mut count_txs = 0; - mempool - .iterate_candidates::<_, ChainstateError>( - &b_4.0, - &b_4.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) - .unwrap(); - assert_eq!( - count_txs, 3, - "After replace, mempool should find *three* transactions from b_4" + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_2.0, &b_2.1).unwrap(), + 0 ); } diff --git a/src/net/relay.rs b/src/net/relay.rs index d1fe92da27..877bafa394 100644 --- a/src/net/relay.rs +++ b/src/net/relay.rs @@ -1084,7 +1084,7 @@ impl Relayer { // garbage-collect if chain_height > MEMPOOL_MAX_TRANSACTION_AGE { - let min_height = chain_height - MEMPOOL_MAX_TRANSACTION_AGE; + let min_height = chain_height.saturating_sub(MEMPOOL_MAX_TRANSACTION_AGE); let mut mempool_tx = mempool.tx_begin()?; debug!( diff --git a/src/util/db.rs b/src/util/db.rs index ab9388a39e..38138b36c9 100644 --- a/src/util/db.rs +++ b/src/util/db.rs @@ -169,6 +169,17 @@ impl FromRow for u64 { } } +impl FromRow<(u64, u64)> for (u64, u64) { + fn from_row<'a>(row: &'a Row) -> Result<(u64, u64), Error> { + let t1: i64 = row.get_unwrap(0); + let t2: i64 = row.get_unwrap(1); + if t1 < 0 || t2 < 0 { + return Err(Error::ParseError); + } + Ok((t1 as u64, t2 as u64)) + } +} + impl FromColumn for u64 { fn from_column<'a>(row: &'a Row, column_name: &str) -> Result { let x: i64 = row.get_unwrap(column_name); From d0100618b0a338031f42d6b6ab3756bc0723b6e6 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 5 Apr 2021 13:10:55 -0400 Subject: [PATCH 6/9] Added pagination for getting txs at a specific nonce/timestamp, fixed tests --- src/chainstate/stacks/db/headers.rs | 9 + src/chainstate/stacks/db/mod.rs | 4 + src/chainstate/stacks/miner.rs | 193 ++++---- src/core/mempool.rs | 443 ++++++++---------- src/util/db.rs | 11 - testnet/stacks-node/src/tests/integrations.rs | 31 +- 6 files changed, 306 insertions(+), 385 deletions(-) diff --git a/src/chainstate/stacks/db/headers.rs b/src/chainstate/stacks/db/headers.rs index e5c4158445..e87a1f7880 100644 --- a/src/chainstate/stacks/db/headers.rs +++ b/src/chainstate/stacks/db/headers.rs @@ -307,6 +307,15 @@ impl StacksChainState { Ok(ancestors) } + /// Get an ancestor's height given the tip index hash and the ancestor index hash + pub fn get_block_height_of_ancestor( + conn: &StacksDBConn, + tip_index_hash: &StacksBlockId, + ancestor_index_hash: &StacksBlockId, + ) -> Result, db_error> { + conn.get_ancestor_block_height(ancestor_index_hash, tip_index_hash) + } + /// Get the genesis (boot code) block header pub fn get_genesis_header_info(conn: &Connection) -> Result { // by construction, only one block can have height 0 in this DB diff --git a/src/chainstate/stacks/db/mod.rs b/src/chainstate/stacks/db/mod.rs index 220a157daf..5155d69321 100644 --- a/src/chainstate/stacks/db/mod.rs +++ b/src/chainstate/stacks/db/mod.rs @@ -1424,6 +1424,10 @@ impl StacksChainState { Ok(StacksDBTx::new(&mut self.state_index, ())) } + pub fn index_conn<'a>(&'a self) -> Result, Error> { + Ok(StacksDBConn::new(&self.state_index, ())) + } + /// Begin a transaction against the underlying DB /// Does not create a Clarity instance, and does not affect the MARF. pub fn db_tx_begin<'a>(&'a mut self) -> Result, Error> { diff --git a/src/chainstate/stacks/miner.rs b/src/chainstate/stacks/miner.rs index d11f2fdcb0..d9984bc7f7 100644 --- a/src/chainstate/stacks/miner.rs +++ b/src/chainstate/stacks/miner.rs @@ -426,44 +426,38 @@ impl<'a> StacksMicroblockBuilder<'a> { let mut bytes_so_far = self.runtime.bytes_so_far; let mut num_txs = self.runtime.num_mined; - let result = mem_pool.iterate_candidates( - &self.anchor_block_consensus_hash, - &self.anchor_block, - self.anchor_block_height, - &mut self.header_reader, - |micro_txs| { - let mut result = Ok(()); - for mempool_tx in micro_txs.into_iter() { - match StacksMicroblockBuilder::mine_next_transaction( - &mut clarity_tx, - mempool_tx.tx.clone(), - mempool_tx.metadata.len, - &mut considered, - bytes_so_far, - ) { - Ok(true) => { - bytes_so_far += mempool_tx.metadata.len; - - debug!( - "Include tx {} ({}) in microblock", - mempool_tx.tx.txid(), - mempool_tx.tx.payload.name() - ); - txs_included.push(mempool_tx.tx); - num_txs += 1; - } - Ok(false) => { - continue; - } - Err(e) => { - result = Err(e); - break; - } + let result = mem_pool.iterate_candidates(self.anchor_block_height, |micro_txs| { + let mut result = Ok(()); + for mempool_tx in micro_txs.into_iter() { + match StacksMicroblockBuilder::mine_next_transaction( + &mut clarity_tx, + mempool_tx.tx.clone(), + mempool_tx.metadata.len, + &mut considered, + bytes_so_far, + ) { + Ok(true) => { + bytes_so_far += mempool_tx.metadata.len; + + debug!( + "Include tx {} ({}) in microblock", + mempool_tx.tx.txid(), + mempool_tx.tx.payload.name() + ); + txs_included.push(mempool_tx.tx); + num_txs += 1; + } + Ok(false) => { + continue; + } + Err(e) => { + result = Err(e); + break; } } - result - }, - ); + } + result + }); self.runtime.bytes_so_far = bytes_so_far; self.clarity_tx.replace(clarity_tx); @@ -1367,7 +1361,6 @@ impl StacksBlockBuilder { &tip_consensus_hash, &tip_block_hash, tip_height ); - let (mut header_reader_chainstate, _) = chainstate_handle.reopen()?; // used for reading block headers during an epoch let (mut chainstate, _) = chainstate_handle.reopen_limited(execution_budget)?; // used for processing a block up to the given limit let mut builder = StacksBlockBuilder::make_block_builder( @@ -1391,86 +1384,80 @@ impl StacksBlockBuilder { let mut block_limit_hit = BlockLimitFunction::NO_LIMIT_HIT; - let result = mempool.iterate_candidates( - &tip_consensus_hash, - &tip_block_hash, - tip_height, - &mut header_reader_chainstate, - |available_txs| { - if block_limit_hit == BlockLimitFunction::LIMIT_REACHED { - return Ok(()); - } + let result = mempool.iterate_candidates(tip_height, |available_txs| { + if block_limit_hit == BlockLimitFunction::LIMIT_REACHED { + return Ok(()); + } - for txinfo in available_txs.into_iter() { - // skip transactions early if we can - if considered.contains(&txinfo.tx.txid()) { + for txinfo in available_txs.into_iter() { + // skip transactions early if we can + if considered.contains(&txinfo.tx.txid()) { + continue; + } + if let Some(nonce) = mined_origin_nonces.get(&txinfo.tx.origin_address()) { + if *nonce >= txinfo.tx.get_origin_nonce() { continue; } - if let Some(nonce) = mined_origin_nonces.get(&txinfo.tx.origin_address()) { - if *nonce >= txinfo.tx.get_origin_nonce() { - continue; - } - } - if let Some(sponsor_addr) = txinfo.tx.sponsor_address() { - if let Some(nonce) = mined_sponsor_nonces.get(&sponsor_addr) { - if let Some(sponsor_nonce) = txinfo.tx.get_sponsor_nonce() { - if *nonce >= sponsor_nonce { - continue; - } + } + if let Some(sponsor_addr) = txinfo.tx.sponsor_address() { + if let Some(nonce) = mined_sponsor_nonces.get(&sponsor_addr) { + if let Some(sponsor_nonce) = txinfo.tx.get_sponsor_nonce() { + if *nonce >= sponsor_nonce { + continue; } } } + } - considered.insert(txinfo.tx.txid()); - - match builder.try_mine_tx_with_len( - &mut epoch_tx, - &txinfo.tx, - txinfo.metadata.len, - &block_limit_hit, - ) { - Ok(_) => {} - Err(Error::BlockTooBigError) => { - // done mining -- our execution budget is exceeded. - // Make the block from the transactions we did manage to get - debug!("Block budget exceeded on tx {}", &txinfo.tx.txid()); - if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; - continue; - } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::LIMIT_REACHED; - } - } - Err(Error::TransactionTooBigError) => { - invalidated_txs.push(txinfo.metadata.txid); - if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; - continue; - } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::LIMIT_REACHED; - } - } - Err(Error::InvalidStacksTransaction(_, true)) => { - // if we have an invalid transaction that was quietly ignored, don't warn here either + considered.insert(txinfo.tx.txid()); + + match builder.try_mine_tx_with_len( + &mut epoch_tx, + &txinfo.tx, + txinfo.metadata.len, + &block_limit_hit, + ) { + Ok(_) => {} + Err(Error::BlockTooBigError) => { + // done mining -- our execution budget is exceeded. + // Make the block from the transactions we did manage to get + debug!("Block budget exceeded on tx {}", &txinfo.tx.txid()); + if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; continue; + } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::LIMIT_REACHED; } - Err(e) => { - warn!("Failed to apply tx {}: {:?}", &txinfo.tx.txid(), &e); + } + Err(Error::TransactionTooBigError) => { + invalidated_txs.push(txinfo.metadata.txid); + if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; continue; + } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::LIMIT_REACHED; } } - - mined_origin_nonces - .insert(txinfo.tx.origin_address(), txinfo.tx.get_origin_nonce()); - if let (Some(sponsor_addr), Some(sponsor_nonce)) = - (txinfo.tx.sponsor_address(), txinfo.tx.get_sponsor_nonce()) - { - mined_sponsor_nonces.insert(sponsor_addr, sponsor_nonce); + Err(Error::InvalidStacksTransaction(_, true)) => { + // if we have an invalid transaction that was quietly ignored, don't warn here either + continue; + } + Err(e) => { + warn!("Failed to apply tx {}: {:?}", &txinfo.tx.txid(), &e); + continue; } } - Ok(()) - }, - ); + + mined_origin_nonces + .insert(txinfo.tx.origin_address(), txinfo.tx.get_origin_nonce()); + if let (Some(sponsor_addr), Some(sponsor_nonce)) = + (txinfo.tx.sponsor_address(), txinfo.tx.get_sponsor_nonce()) + { + mined_sponsor_nonces.insert(sponsor_addr, sponsor_nonce); + } + } + Ok(()) + }); mempool.drop_txs(&invalidated_txs)?; if let Some(observer) = event_observer { diff --git a/src/core/mempool.rs b/src/core/mempool.rs index 7755be029e..c0457ef484 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -40,13 +40,13 @@ use std::fs; use std::io::Read; use std::path::{Path, PathBuf}; -use util::db::query_row; use util::db::query_rows; use util::db::tx_begin_immediate; use util::db::tx_busy_handler; use util::db::u64_to_sql; use util::db::Error as db_error; use util::db::FromColumn; +use util::db::{query_row, Error}; use util::db::{sql_pragma, DBConn, DBTx, FromRow}; use util::get_epoch_time_secs; @@ -196,6 +196,24 @@ impl FromRow for MemPoolTxInfo { } } +impl FromRow<(u64, u64)> for (u64, u64) { + fn from_row<'a>(row: &'a Row) -> Result<(u64, u64), db_error> { + let t1: i64 = row.get_unwrap(0); + let t2: i64 = row.get_unwrap(1); + if t1 < 0 || t2 < 0 { + return Err(db_error::ParseError); + } + Ok((t1 as u64, t2 as u64)) + } +} + +pub fn u32_to_sql(x: u32) -> Result { + if x > (i32::max_value() as u32) { + return Err(Error::ParseError); + } + Ok(x as i32) +} + const MEMPOOL_INITIAL_SCHEMA: &'static [&'static str] = &[ r#" CREATE TABLE mempool( @@ -255,39 +273,6 @@ impl<'a> MemPoolTx<'a> { pub fn commit(self) -> Result<(), db_error> { self.tx.commit().map_err(db_error::SqliteError) } - - fn are_blocks_in_same_fork( - &mut self, - chainstate: &mut StacksChainState, - first_consensus_hash: &ConsensusHash, - first_stacks_block: &BlockHeaderHash, - second_consensus_hash: &ConsensusHash, - second_stacks_block: &BlockHeaderHash, - ) -> Result { - let first_block = - StacksBlockHeader::make_index_block_hash(first_consensus_hash, first_stacks_block); - let second_block = - StacksBlockHeader::make_index_block_hash(second_consensus_hash, second_stacks_block); - // short circuit equality - if second_block == first_block { - return Ok(true); - } - - let height_of_first_with_second_tip = chainstate - .with_clarity_marf(|marf| marf.get_block_height_of(&first_block, &second_block)); - let height_of_second_with_first_tip = chainstate - .with_clarity_marf(|marf| marf.get_block_height_of(&second_block, &first_block)); - - match ( - height_of_first_with_second_tip, - height_of_second_with_first_tip, - ) { - (Ok(None), Ok(None)) => Ok(false), - (Ok(_), Ok(_)) => Ok(true), - (Err(x), _) => Err(db_error::IndexError(x)), - (_, Err(x)) => Err(db_error::IndexError(x)), - } - } } impl MemPoolTxInfo { @@ -415,14 +400,7 @@ impl MemPoolDB { /// /// Consider transactions across all forks where the transactions have /// height >= max(0, tip_height - MEMPOOL_MAX_TRANSACTION_AGE) and height <= tip_height. - pub fn iterate_candidates( - &self, - tip_consensus_hash: &ConsensusHash, - tip_block_hash: &BlockHeaderHash, - tip_height: u64, - chainstate: &mut StacksChainState, - mut todo: F, - ) -> Result<(), E> + pub fn iterate_candidates(&self, tip_height: u64, mut todo: F) -> Result<(), E> where F: FnMut(Vec) -> Result<(), E>, E: From + From, @@ -434,14 +412,16 @@ impl MemPoolDB { Some(h) if h == 0 => None, Some(h) => Some(h - 1), }; + let mut curr_page = 0; - let (mut next_nonce, mut next_timestamp) = - match MemPoolDB::get_next_nonce(&self.db, min_height, tip_height, None)? { - None => { - return Ok(()); - } - Some((nonce, ts)) => (nonce, ts), - }; + let (mut next_nonce, mut next_timestamp) = match MemPoolDB::get_next_nonce_and_timestamp( + &self.db, min_height, tip_height, None, + )? { + None => { + return Ok(()); + } + Some((nonce, ts)) => (nonce, ts), + }; loop { let available_txs = MemPoolDB::get_txs_at_nonce_and_timestamp( @@ -450,6 +430,7 @@ impl MemPoolDB { tip_height, next_nonce, next_timestamp, + curr_page, )?; debug!( "Have {} transactions at nonce={} at or after {}", @@ -458,32 +439,37 @@ impl MemPoolDB { next_timestamp ); - todo(available_txs)?; - next_timestamp = match MemPoolDB::get_next_timestamp_at_nonce( - &self.db, - min_height, - tip_height, - next_timestamp, - next_nonce, - )? { - Some(ts) => ts, - None => { - match MemPoolDB::get_next_nonce( - &self.db, - min_height, - tip_height, - Some(next_nonce), - )? { - None => { - return Ok(()); - } - Some((nonce, ts)) => { - next_nonce = nonce; - ts + if available_txs.len() > 0 { + todo(available_txs)?; + curr_page += 1; + } else { + curr_page = 0; + next_timestamp = match MemPoolDB::get_next_timestamp_at_nonce( + &self.db, + min_height, + tip_height, + next_timestamp, + next_nonce, + )? { + Some(ts) => ts, + None => { + match MemPoolDB::get_next_nonce_and_timestamp( + &self.db, + min_height, + tip_height, + Some(next_nonce), + )? { + None => { + return Ok(()); + } + Some((nonce, ts)) => { + next_nonce = nonce; + ts + } } } - } - }; + }; + } } } @@ -535,7 +521,7 @@ impl MemPoolDB { } /// Get the next nonce/timestamp after this nonce that occurs in the height range. - pub fn get_next_nonce( + pub fn get_next_nonce_and_timestamp( conn: &DBConn, min_height: Option, max_height: u64, @@ -563,7 +549,7 @@ impl MemPoolDB { max_height: u64, timestamp: u64, nonce: u64, - ) -> Result, db_error> { + ) -> Result, db_error> { let min_height_sql_arg = match min_height { None => -1, Some(h) => u64_to_sql(h)?, @@ -588,19 +574,23 @@ impl MemPoolDB { max_height: u64, nonce: u64, timestamp: u64, + curr_page: u32, ) -> Result, db_error> { let min_height_sql_arg = match min_height { None => -1, Some(h) => u64_to_sql(h)?, }; + let limit = 200; let sql = "SELECT * FROM mempool WHERE origin_nonce = ?1 AND accept_time = ?2 AND \ - height > ?3 AND height <= ?4 ORDER BY sponsor_nonce ASC"; + height > ?3 AND height <= ?4 ORDER BY sponsor_nonce ASC LIMIT ?5 OFFSET ?6"; let args: &[&dyn ToSql] = &[ &u64_to_sql(nonce)?, &u64_to_sql(timestamp)?, &min_height_sql_arg, &u64_to_sql(max_height)?, + &u32_to_sql(limit)?, + &u32_to_sql(limit * curr_page)?, ]; let rows = query_rows::(conn, &sql, args)?; Ok(rows) @@ -700,6 +690,45 @@ impl MemPoolDB { Ok(cmp::max(as_origin, as_sponsor)) } + fn are_blocks_in_same_fork( + chainstate: &mut StacksChainState, + first_consensus_hash: &ConsensusHash, + first_stacks_block: &BlockHeaderHash, + second_consensus_hash: &ConsensusHash, + second_stacks_block: &BlockHeaderHash, + ) -> Result { + let first_block = + StacksBlockHeader::make_index_block_hash(first_consensus_hash, first_stacks_block); + let second_block = + StacksBlockHeader::make_index_block_hash(second_consensus_hash, second_stacks_block); + // short circuit equality + if second_block == first_block { + return Ok(true); + } + + let headers_conn = chainstate + .index_conn() + .map_err(|_e| db_error::Other("ChainstateError".to_string()))?; + let height_of_first_with_second_tip = StacksChainState::get_block_height_of_ancestor( + &headers_conn, + &second_block, + &first_block, + )?; + let height_of_second_with_first_tip = StacksChainState::get_block_height_of_ancestor( + &headers_conn, + &first_block, + &second_block, + )?; + + match ( + height_of_first_with_second_tip, + height_of_second_with_first_tip, + ) { + (None, None) => Ok(false), + (_, _) => Ok(true), + } + } + /// Add a transaction to the mempool. If it already exists, then replace it if the given fee /// is higher than the one that's already there. /// Carry out the mempool admission test before adding. @@ -742,7 +771,7 @@ impl MemPoolDB { // is this a replace-by-fee ? replace_reason = MemPoolDropReason::REPLACE_BY_FEE; true - } else if !tx.are_blocks_in_same_fork( + } else if !MemPoolDB::are_blocks_in_same_fork( chainstate, &prior_tx.consensus_hash, &prior_tx.block_header_hash, @@ -1103,83 +1132,83 @@ mod tests { let _mempool = MemPoolDB::open(false, 0x80000000, &chainstate_path).unwrap(); } - #[test] - fn mempool_walk_over_fork() { - let mut chainstate = instantiate_chainstate_with_balances( - false, - 0x80000000, - "mempool_walk_over_fork", - vec![], - ); - - fn make_block( - chainstate: &mut StacksChainState, - block_consensus: ConsensusHash, - parent: &(ConsensusHash, BlockHeaderHash), - burn_height: u64, - block_height: u64, - ) -> (ConsensusHash, BlockHeaderHash) { - let (mut chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - - let anchored_header = StacksBlockHeader { - version: 1, - total_work: StacksWorkScore { - work: block_height, - burn: 1, - }, - proof: VRFProof::empty(), - parent_block: parent.1.clone(), - parent_microblock: BlockHeaderHash([0; 32]), - parent_microblock_sequence: 0, - tx_merkle_root: Sha512Trunc256Sum::empty(), - state_index_root: TrieHash::from_empty_data(), - microblock_pubkey_hash: Hash160([0; 20]), - }; + fn make_block( + chainstate: &mut StacksChainState, + block_consensus: ConsensusHash, + parent: &(ConsensusHash, BlockHeaderHash), + burn_height: u64, + block_height: u64, + ) -> (ConsensusHash, BlockHeaderHash) { + let (mut chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let block_hash = anchored_header.block_hash(); + let anchored_header = StacksBlockHeader { + version: 1, + total_work: StacksWorkScore { + work: block_height, + burn: 1, + }, + proof: VRFProof::empty(), + parent_block: parent.1.clone(), + parent_microblock: BlockHeaderHash([0; 32]), + parent_microblock_sequence: 0, + tx_merkle_root: Sha512Trunc256Sum::empty(), + state_index_root: TrieHash::from_empty_data(), + microblock_pubkey_hash: Hash160([0; 20]), + }; - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &parent.0, - &parent.1, - &block_consensus, - &block_hash, - ); + let block_hash = anchored_header.block_hash(); - let new_tip_info = StacksHeaderInfo { - anchored_header, - microblock_tail: None, - index_root: TrieHash::from_empty_data(), - block_height, - consensus_hash: block_consensus.clone(), - burn_header_hash: BurnchainHeaderHash([0; 32]), - burn_header_height: burn_height as u32, - burn_header_timestamp: 0, - anchored_block_size: 1, - }; + let c_tx = StacksChainState::chainstate_block_begin( + &chainstate_tx, + clar_tx, + &NULL_BURN_STATE_DB, + &parent.0, + &parent.1, + &block_consensus, + &block_hash, + ); - c_tx.commit_block(); + let new_tip_info = StacksHeaderInfo { + anchored_header, + microblock_tail: None, + index_root: TrieHash::from_empty_data(), + block_height, + consensus_hash: block_consensus.clone(), + burn_header_hash: BurnchainHeaderHash([0; 32]), + burn_header_height: burn_height as u32, + burn_header_timestamp: 0, + anchored_block_size: 1, + }; - let new_index_hash = StacksBlockId::new(&block_consensus, &block_hash); + c_tx.commit_block(); - chainstate_tx - .put_indexed_begin(&StacksBlockId::new(&parent.0, &parent.1), &new_index_hash) - .unwrap(); + let new_index_hash = StacksBlockId::new(&block_consensus, &block_hash); - StacksChainState::insert_stacks_block_header( - &mut chainstate_tx, - &new_index_hash, - &new_tip_info, - &ExecutionCost::zero(), - ) + chainstate_tx + .put_indexed_begin(&StacksBlockId::new(&parent.0, &parent.1), &new_index_hash) .unwrap(); - chainstate_tx.commit().unwrap(); + StacksChainState::insert_stacks_block_header( + &mut chainstate_tx, + &new_index_hash, + &new_tip_info, + &ExecutionCost::zero(), + ) + .unwrap(); + + chainstate_tx.commit().unwrap(); - (block_consensus, block_hash) - } + (block_consensus, block_hash) + } + + #[test] + fn mempool_walk_over_fork() { + let mut chainstate = instantiate_chainstate_with_balances( + false, + 0x80000000, + "mempool_walk_over_fork", + vec![], + ); // genesis -> b_1* -> b_2* // \-> b_3 -> b_4 @@ -1276,16 +1305,10 @@ mod tests { let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_2.0, - &b_2.1, - 2, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(2, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( count_txs, 2, @@ -1294,16 +1317,10 @@ mod tests { let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_5.0, - &b_5.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(3, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( count_txs, 3, @@ -1312,16 +1329,10 @@ mod tests { let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_3.0, - &b_3.1, - 2, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(2, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( count_txs, 2, @@ -1330,16 +1341,10 @@ mod tests { let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_4.0, - &b_4.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(3, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( count_txs, 3, @@ -1467,65 +1472,19 @@ mod tests { // genesis -> b_1 -> b_2 // \-> b_3 - - let b_1 = (ConsensusHash([0x1; 20]), BlockHeaderHash([0x4; 32])); - let b_2 = (ConsensusHash([0x2; 20]), BlockHeaderHash([0x5; 32])); - let b_3 = (ConsensusHash([0x3; 20]), BlockHeaderHash([0x6; 32])); - - eprintln!( - "b_1 => {}", - &StacksBlockHeader::make_index_block_hash(&b_1.0, &b_1.1) - ); - eprintln!( - "b_2 => {}", - &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1) - ); - eprintln!( - "b_3 => {}", - &StacksBlockHeader::make_index_block_hash(&b_3.0, &b_3.1) + // + let b_1 = make_block( + &mut chainstate, + ConsensusHash([0x1; 20]), + &( + FIRST_BURNCHAIN_CONSENSUS_HASH.clone(), + FIRST_STACKS_BLOCK_HASH.clone(), + ), + 1, + 1, ); - - { - let (chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &FIRST_BURNCHAIN_CONSENSUS_HASH, - &FIRST_STACKS_BLOCK_HASH, - &b_1.0, - &b_1.1, - ); - c_tx.commit_block(); - } - - { - let (chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &FIRST_BURNCHAIN_CONSENSUS_HASH, - &FIRST_STACKS_BLOCK_HASH, - &b_3.0, - &b_3.1, - ); - c_tx.commit_block(); - } - - { - let (chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &b_1.0, - &b_1.1, - &b_2.0, - &b_2.1, - ); - c_tx.commit_block(); - } + let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2); + let b_3 = make_block(&mut chainstate, ConsensusHash([0x3; 20]), &b_1, 1, 1); let chainstate_path = chainstate_path("mempool_do_not_replace_tx"); let mut mempool = MemPoolDB::open(false, 0x80000000, &chainstate_path).unwrap(); diff --git a/src/util/db.rs b/src/util/db.rs index 38138b36c9..ab9388a39e 100644 --- a/src/util/db.rs +++ b/src/util/db.rs @@ -169,17 +169,6 @@ impl FromRow for u64 { } } -impl FromRow<(u64, u64)> for (u64, u64) { - fn from_row<'a>(row: &'a Row) -> Result<(u64, u64), Error> { - let t1: i64 = row.get_unwrap(0); - let t2: i64 = row.get_unwrap(1); - if t1 < 0 || t2 < 0 { - return Err(Error::ParseError); - } - Ok((t1 as u64, t2 as u64)) - } -} - impl FromColumn for u64 { fn from_column<'a>(row: &'a Row, column_name: &str) -> Result { let x: i64 = row.get_unwrap(column_name); diff --git a/testnet/stacks-node/src/tests/integrations.rs b/testnet/stacks-node/src/tests/integrations.rs index c86f7da0fc..fe28647dad 100644 --- a/testnet/stacks-node/src/tests/integrations.rs +++ b/testnet/stacks-node/src/tests/integrations.rs @@ -1034,7 +1034,7 @@ fn mine_transactions_out_of_order() { conf.burnchain.commit_anchor_block_within = 5000; conf.add_initial_balance(addr.to_string(), 100000); - let num_rounds = 6; + let num_rounds = 5; let mut run_loop = RunLoop::new(conf); run_loop @@ -1131,40 +1131,13 @@ fn mine_transactions_out_of_order() { } 4 => { assert_eq!(chain_tip.metadata.block_height, 5); - assert_eq!(chain_tip.block.txs.len(), 2); + assert_eq!(chain_tip.block.txs.len(), 5); // check that 1000 stx _was_ transfered to the contract principal let curr_tip = ( chain_tip.metadata.consensus_hash.clone(), chain_tip.metadata.anchored_header.block_hash(), ); - assert_eq!( - chain_state - .with_read_only_clarity_tx( - burn_dbconn, - &StacksBlockHeader::make_index_block_hash(&curr_tip.0, &curr_tip.1), - |conn| { - conn.with_clarity_db_readonly(|db| { - db.get_account_stx_balance( - &contract_identifier.clone().into(), - ) - .amount_unlocked - }) - } - ) - .unwrap(), - 1000 - ); - } - 5 => { - assert_eq!(chain_tip.metadata.block_height, 6); - assert_eq!(chain_tip.block.txs.len(), 4); - - // check that 3000 stx _was_ transferred to the contract principal total - let curr_tip = ( - chain_tip.metadata.consensus_hash.clone(), - chain_tip.metadata.anchored_header.block_hash(), - ); assert_eq!( chain_state .with_read_only_clarity_tx( From ffc930fa690f0528ad7e3c5c0b9839f009943dba Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Fri, 9 Apr 2021 13:44:48 -0400 Subject: [PATCH 7/9] Simplified logic for iterate by removing timestamps from consideration --- src/chainstate/stacks/db/headers.rs | 9 --- src/core/mempool.rs | 111 +++++++--------------------- 2 files changed, 27 insertions(+), 93 deletions(-) diff --git a/src/chainstate/stacks/db/headers.rs b/src/chainstate/stacks/db/headers.rs index e87a1f7880..e5c4158445 100644 --- a/src/chainstate/stacks/db/headers.rs +++ b/src/chainstate/stacks/db/headers.rs @@ -307,15 +307,6 @@ impl StacksChainState { Ok(ancestors) } - /// Get an ancestor's height given the tip index hash and the ancestor index hash - pub fn get_block_height_of_ancestor( - conn: &StacksDBConn, - tip_index_hash: &StacksBlockId, - ancestor_index_hash: &StacksBlockId, - ) -> Result, db_error> { - conn.get_ancestor_block_height(ancestor_index_hash, tip_index_hash) - } - /// Get the genesis (boot code) block header pub fn get_genesis_header_info(conn: &Connection) -> Result { // by construction, only one block can have height 0 in this DB diff --git a/src/core/mempool.rs b/src/core/mempool.rs index c0457ef484..2228481d3f 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -406,37 +406,26 @@ impl MemPoolDB { E: From + From, { // Want to consider transactions with - // height > max(0, `tip_height - MEMPOOL_MAX_TRANSACTION_AGE`) - 1 - let min_height = match tip_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE) { - None => None, - Some(h) if h == 0 => None, - Some(h) => Some(h - 1), - }; + // height > max(-1, tip_height - (MEMPOOL_MAX_TRANSACTION_AGE + 1)) + let min_height = tip_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE + 1); let mut curr_page = 0; - let (mut next_nonce, mut next_timestamp) = match MemPoolDB::get_next_nonce_and_timestamp( - &self.db, min_height, tip_height, None, - )? { - None => { - return Ok(()); - } - Some((nonce, ts)) => (nonce, ts), - }; + let mut next_nonce = + match MemPoolDB::get_next_nonce(&self.db, min_height, tip_height, None)? { + None => { + return Ok(()); + } + Some(nonce) => nonce, + }; loop { - let available_txs = MemPoolDB::get_txs_at_nonce_and_timestamp( - &self.db, - min_height, - tip_height, - next_nonce, - next_timestamp, - curr_page, + let available_txs = MemPoolDB::get_txs_at_nonce_and_offset( + &self.db, min_height, tip_height, next_nonce, curr_page, )?; debug!( - "Have {} transactions at nonce={} at or after {}", + "Have {} transactions at nonce={}", available_txs.len(), next_nonce, - next_timestamp ); if available_txs.len() > 0 { @@ -444,30 +433,16 @@ impl MemPoolDB { curr_page += 1; } else { curr_page = 0; - next_timestamp = match MemPoolDB::get_next_timestamp_at_nonce( + next_nonce = match MemPoolDB::get_next_nonce( &self.db, min_height, tip_height, - next_timestamp, - next_nonce, + Some(next_nonce), )? { - Some(ts) => ts, None => { - match MemPoolDB::get_next_nonce_and_timestamp( - &self.db, - min_height, - tip_height, - Some(next_nonce), - )? { - None => { - return Ok(()); - } - Some((nonce, ts)) => { - next_nonce = nonce; - ts - } - } + return Ok(()); } + Some(nonce) => nonce, }; } } @@ -521,12 +496,12 @@ impl MemPoolDB { } /// Get the next nonce/timestamp after this nonce that occurs in the height range. - pub fn get_next_nonce_and_timestamp( + pub fn get_next_nonce( conn: &DBConn, min_height: Option, max_height: u64, nonce: Option, - ) -> Result, db_error> { + ) -> Result, db_error> { let nonce = match nonce { None => -1, Some(n) => u64_to_sql(n)?, @@ -535,45 +510,20 @@ impl MemPoolDB { None => -1, Some(h) => u64_to_sql(h)?, }; - let sql = "SELECT origin_nonce, accept_time FROM mempool WHERE origin_nonce > ?1 AND \ + let sql = "SELECT origin_nonce FROM mempool WHERE origin_nonce > ?1 AND \ height > ?2 AND height <= ?3 ORDER BY origin_nonce, accept_time ASC LIMIT 1"; let args: &[&dyn ToSql] = &[&nonce, &min_height_sql_arg, &u64_to_sql(max_height)?]; query_row(conn, sql, args) } - /// Get the next timestamp after this one that occurs with this nonce in the height range. - pub fn get_next_timestamp_at_nonce( - conn: &DBConn, - min_height: Option, - max_height: u64, - timestamp: u64, - nonce: u64, - ) -> Result, db_error> { - let min_height_sql_arg = match min_height { - None => -1, - Some(h) => u64_to_sql(h)?, - }; - - let sql = "SELECT accept_time FROM mempool WHERE accept_time > ?1 AND \ - origin_nonce = ?2 AND height > ?3 AND height <= ?4 ORDER BY accept_time ASC LIMIT 1"; - let args: &[&dyn ToSql] = &[ - &u64_to_sql(timestamp)?, - &u64_to_sql(nonce)?, - &min_height_sql_arg, - &u64_to_sql(max_height)?, - ]; - query_row(conn, sql, args) - } - /// Get all transactions at a particular nonce and timestamp on a given chain tip. /// Order them by sponsor nonce. - pub fn get_txs_at_nonce_and_timestamp( + pub fn get_txs_at_nonce_and_offset( conn: &DBConn, min_height: Option, max_height: u64, nonce: u64, - timestamp: u64, curr_page: u32, ) -> Result, db_error> { let min_height_sql_arg = match min_height { @@ -582,11 +532,10 @@ impl MemPoolDB { }; let limit = 200; - let sql = "SELECT * FROM mempool WHERE origin_nonce = ?1 AND accept_time = ?2 AND \ - height > ?3 AND height <= ?4 ORDER BY sponsor_nonce ASC LIMIT ?5 OFFSET ?6"; + let sql = "SELECT * FROM mempool WHERE origin_nonce = ?1 AND \ + height > ?2 AND height <= ?3 ORDER BY sponsor_nonce ASC LIMIT ?4 OFFSET ?5"; let args: &[&dyn ToSql] = &[ &u64_to_sql(nonce)?, - &u64_to_sql(timestamp)?, &min_height_sql_arg, &u64_to_sql(max_height)?, &u32_to_sql(limit)?, @@ -706,19 +655,13 @@ impl MemPoolDB { return Ok(true); } - let headers_conn = chainstate + let headers_conn = &chainstate .index_conn() .map_err(|_e| db_error::Other("ChainstateError".to_string()))?; - let height_of_first_with_second_tip = StacksChainState::get_block_height_of_ancestor( - &headers_conn, - &second_block, - &first_block, - )?; - let height_of_second_with_first_tip = StacksChainState::get_block_height_of_ancestor( - &headers_conn, - &first_block, - &second_block, - )?; + let height_of_first_with_second_tip = + headers_conn.get_ancestor_block_height(&second_block, &first_block)?; + let height_of_second_with_first_tip = + headers_conn.get_ancestor_block_height(&first_block, &second_block)?; match ( height_of_first_with_second_tip, From 9485e155e1f4980760675d67c3cdc6f7fddd9309 Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Wed, 14 Apr 2021 15:06:28 -0400 Subject: [PATCH 8/9] Removed unnecessary u32_to_sql fn --- src/core/mempool.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index 2228481d3f..f19e6ccdef 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -207,13 +207,6 @@ impl FromRow<(u64, u64)> for (u64, u64) { } } -pub fn u32_to_sql(x: u32) -> Result { - if x > (i32::max_value() as u32) { - return Err(Error::ParseError); - } - Ok(x as i32) -} - const MEMPOOL_INITIAL_SCHEMA: &'static [&'static str] = &[ r#" CREATE TABLE mempool( @@ -531,6 +524,7 @@ impl MemPoolDB { Some(h) => u64_to_sql(h)?, }; let limit = 200; + let offset = limit * curr_page; let sql = "SELECT * FROM mempool WHERE origin_nonce = ?1 AND \ height > ?2 AND height <= ?3 ORDER BY sponsor_nonce ASC LIMIT ?4 OFFSET ?5"; @@ -538,8 +532,8 @@ impl MemPoolDB { &u64_to_sql(nonce)?, &min_height_sql_arg, &u64_to_sql(max_height)?, - &u32_to_sql(limit)?, - &u32_to_sql(limit * curr_page)?, + &limit, + &offset, ]; let rows = query_rows::(conn, &sql, args)?; Ok(rows) From 22d42168220c0f7e56cb6a05862d11f45be0b80e Mon Sep 17 00:00:00 2001 From: Pavitthra Pandurangan Date: Mon, 26 Apr 2021 13:56:54 -0600 Subject: [PATCH 9/9] Fix redundant import --- src/core/mempool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index c273a92b74..47fe146aa4 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -41,7 +41,6 @@ use core::FIRST_BURNCHAIN_CONSENSUS_HASH; use core::FIRST_STACKS_BLOCK_HASH; use monitoring::increment_stx_mempool_gc; use net::StacksMessageCodec; -use util::db::query_row; use util::db::query_rows; use util::db::tx_begin_immediate; use util::db::tx_busy_handler;