diff --git a/src/chainstate/stacks/db/headers.rs b/src/chainstate/stacks/db/headers.rs index 87a2aaca86..9bc4360f37 100644 --- a/src/chainstate/stacks/db/headers.rs +++ b/src/chainstate/stacks/db/headers.rs @@ -279,23 +279,6 @@ impl StacksChainState { } } - /// Get an ancestor block header given an index hash - pub fn get_index_tip_ancestor_conn( - conn: &StacksDBConn, - tip_index_hash: &StacksBlockId, - height: u64, - ) -> Result, Error> { - match conn - .get_ancestor_block_hash(height, tip_index_hash) - .map_err(Error::DBError)? - { - Some(bhh) => { - StacksChainState::get_stacks_block_header_info_by_index_block_hash(conn, &bhh) - } - None => Ok(None), - } - } - /// Get a segment of headers from the canonical chain pub fn get_ancestors_headers( conn: &Connection, diff --git a/src/chainstate/stacks/miner.rs b/src/chainstate/stacks/miner.rs index a53a734e44..e7108c51e8 100644 --- a/src/chainstate/stacks/miner.rs +++ b/src/chainstate/stacks/miner.rs @@ -437,44 +437,38 @@ impl<'a> StacksMicroblockBuilder<'a> { let mut bytes_so_far = self.runtime.bytes_so_far; let mut num_txs = self.runtime.num_mined; - let result = mem_pool.iterate_candidates( - &self.anchor_block_consensus_hash, - &self.anchor_block, - self.anchor_block_height, - &mut self.header_reader, - |micro_txs| { - let mut result = Ok(()); - for mempool_tx in micro_txs.into_iter() { - match StacksMicroblockBuilder::mine_next_transaction( - &mut clarity_tx, - mempool_tx.tx.clone(), - mempool_tx.metadata.len, - &mut considered, - bytes_so_far, - ) { - Ok(true) => { - bytes_so_far += mempool_tx.metadata.len; - - debug!( - "Include tx {} ({}) in microblock", - mempool_tx.tx.txid(), - mempool_tx.tx.payload.name() - ); - txs_included.push(mempool_tx.tx); - num_txs += 1; - } - Ok(false) => { - continue; - } - Err(e) => { - result = Err(e); - break; - } + let result = mem_pool.iterate_candidates(self.anchor_block_height, |micro_txs| { + let mut result = Ok(()); + for mempool_tx in micro_txs.into_iter() { + match StacksMicroblockBuilder::mine_next_transaction( + &mut clarity_tx, + mempool_tx.tx.clone(), + mempool_tx.metadata.len, + &mut considered, + bytes_so_far, + ) { + Ok(true) => { + bytes_so_far += mempool_tx.metadata.len; + + debug!( + "Include tx {} ({}) in microblock", + mempool_tx.tx.txid(), + mempool_tx.tx.payload.name() + ); + txs_included.push(mempool_tx.tx); + num_txs += 1; + } + Ok(false) => { + continue; + } + Err(e) => { + result = Err(e); + break; } } - result - }, - ); + } + result + }); // do fault injection if self.runtime.disable_bytes_check { @@ -1389,7 +1383,6 @@ impl StacksBlockBuilder { &tip_consensus_hash, &tip_block_hash, tip_height, execution_budget ); - let (mut header_reader_chainstate, _) = chainstate_handle.reopen()?; // used for reading block headers during an epoch let (mut chainstate, _) = chainstate_handle.reopen_limited(execution_budget)?; // used for processing a block up to the given limit let mut builder = StacksBlockBuilder::make_block_builder( @@ -1413,86 +1406,80 @@ impl StacksBlockBuilder { let mut block_limit_hit = BlockLimitFunction::NO_LIMIT_HIT; - let result = mempool.iterate_candidates( - &tip_consensus_hash, - &tip_block_hash, - tip_height, - &mut header_reader_chainstate, - |available_txs| { - if block_limit_hit == BlockLimitFunction::LIMIT_REACHED { - return Ok(()); - } + let result = mempool.iterate_candidates(tip_height, |available_txs| { + if block_limit_hit == BlockLimitFunction::LIMIT_REACHED { + return Ok(()); + } - for txinfo in available_txs.into_iter() { - // skip transactions early if we can - if considered.contains(&txinfo.tx.txid()) { + for txinfo in available_txs.into_iter() { + // skip transactions early if we can + if considered.contains(&txinfo.tx.txid()) { + continue; + } + if let Some(nonce) = mined_origin_nonces.get(&txinfo.tx.origin_address()) { + if *nonce >= txinfo.tx.get_origin_nonce() { continue; } - if let Some(nonce) = mined_origin_nonces.get(&txinfo.tx.origin_address()) { - if *nonce >= txinfo.tx.get_origin_nonce() { - continue; - } - } - if let Some(sponsor_addr) = txinfo.tx.sponsor_address() { - if let Some(nonce) = mined_sponsor_nonces.get(&sponsor_addr) { - if let Some(sponsor_nonce) = txinfo.tx.get_sponsor_nonce() { - if *nonce >= sponsor_nonce { - continue; - } + } + if let Some(sponsor_addr) = txinfo.tx.sponsor_address() { + if let Some(nonce) = mined_sponsor_nonces.get(&sponsor_addr) { + if let Some(sponsor_nonce) = txinfo.tx.get_sponsor_nonce() { + if *nonce >= sponsor_nonce { + continue; } } } + } - considered.insert(txinfo.tx.txid()); - - match builder.try_mine_tx_with_len( - &mut epoch_tx, - &txinfo.tx, - txinfo.metadata.len, - &block_limit_hit, - ) { - Ok(_) => {} - Err(Error::BlockTooBigError) => { - // done mining -- our execution budget is exceeded. - // Make the block from the transactions we did manage to get - debug!("Block budget exceeded on tx {}", &txinfo.tx.txid()); - if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; - continue; - } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::LIMIT_REACHED; - } - } - Err(Error::TransactionTooBigError) => { - invalidated_txs.push(txinfo.metadata.txid); - if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; - continue; - } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { - block_limit_hit = BlockLimitFunction::LIMIT_REACHED; - } - } - Err(Error::InvalidStacksTransaction(_, true)) => { - // if we have an invalid transaction that was quietly ignored, don't warn here either + considered.insert(txinfo.tx.txid()); + + match builder.try_mine_tx_with_len( + &mut epoch_tx, + &txinfo.tx, + txinfo.metadata.len, + &block_limit_hit, + ) { + Ok(_) => {} + Err(Error::BlockTooBigError) => { + // done mining -- our execution budget is exceeded. + // Make the block from the transactions we did manage to get + debug!("Block budget exceeded on tx {}", &txinfo.tx.txid()); + if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; continue; + } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::LIMIT_REACHED; } - Err(e) => { - warn!("Failed to apply tx {}: {:?}", &txinfo.tx.txid(), &e); + } + Err(Error::TransactionTooBigError) => { + invalidated_txs.push(txinfo.metadata.txid); + if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT; continue; + } else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT { + block_limit_hit = BlockLimitFunction::LIMIT_REACHED; } } - - mined_origin_nonces - .insert(txinfo.tx.origin_address(), txinfo.tx.get_origin_nonce()); - if let (Some(sponsor_addr), Some(sponsor_nonce)) = - (txinfo.tx.sponsor_address(), txinfo.tx.get_sponsor_nonce()) - { - mined_sponsor_nonces.insert(sponsor_addr, sponsor_nonce); + Err(Error::InvalidStacksTransaction(_, true)) => { + // if we have an invalid transaction that was quietly ignored, don't warn here either + continue; + } + Err(e) => { + warn!("Failed to apply tx {}: {:?}", &txinfo.tx.txid(), &e); + continue; } } - Ok(()) - }, - ); + + mined_origin_nonces + .insert(txinfo.tx.origin_address(), txinfo.tx.get_origin_nonce()); + if let (Some(sponsor_addr), Some(sponsor_nonce)) = + (txinfo.tx.sponsor_address(), txinfo.tx.get_sponsor_nonce()) + { + mined_sponsor_nonces.insert(sponsor_addr, sponsor_nonce); + } + } + Ok(()) + }); mempool.drop_txs(&invalidated_txs)?; if let Some(observer) = event_observer { diff --git a/src/core/mempool.rs b/src/core/mempool.rs index 0cabee5ffd..47fe146aa4 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -41,13 +41,13 @@ use core::FIRST_BURNCHAIN_CONSENSUS_HASH; use core::FIRST_STACKS_BLOCK_HASH; use monitoring::increment_stx_mempool_gc; use net::StacksMessageCodec; -use util::db::query_row; use util::db::query_rows; use util::db::tx_begin_immediate; use util::db::tx_busy_handler; use util::db::u64_to_sql; use util::db::Error as db_error; use util::db::FromColumn; +use util::db::{query_row, Error}; use util::db::{sql_pragma, DBConn, DBTx, FromRow}; use util::get_epoch_time_secs; use vm::types::PrincipalData; @@ -189,6 +189,17 @@ impl FromRow for MemPoolTxInfo { } } +impl FromRow<(u64, u64)> for (u64, u64) { + fn from_row<'a>(row: &'a Row) -> Result<(u64, u64), db_error> { + let t1: i64 = row.get_unwrap(0); + let t2: i64 = row.get_unwrap(1); + if t1 < 0 || t2 < 0 { + return Err(db_error::ParseError); + } + Ok((t1 as u64, t2 as u64)) + } +} + const MEMPOOL_INITIAL_SCHEMA: &'static [&'static str] = &[ r#" CREATE TABLE mempool( @@ -248,34 +259,6 @@ impl<'a> MemPoolTx<'a> { pub fn commit(self) -> Result<(), db_error> { self.tx.commit().map_err(db_error::SqliteError) } - - fn is_block_in_fork( - &mut self, - chainstate: &mut StacksChainState, - check_consensus_hash: &ConsensusHash, - check_stacks_block: &BlockHeaderHash, - cur_consensus_hash: &ConsensusHash, - cur_stacks_block: &BlockHeaderHash, - ) -> Result { - let admitter_block = - StacksBlockHeader::make_index_block_hash(cur_consensus_hash, cur_stacks_block); - let index_block = - StacksBlockHeader::make_index_block_hash(check_consensus_hash, check_stacks_block); - // short circuit equality - if admitter_block == index_block { - return Ok(true); - } - - let height_result = chainstate - .with_clarity_marf(|marf| marf.get_block_height_of(&index_block, &admitter_block)); - match height_result { - Ok(x) => { - eprintln!("{} from {} => {:?}", &index_block, &admitter_block, x); - Ok(x.is_some()) - } - Err(x) => Err(db_error::IndexError(x)), - } - } } impl MemPoolTxInfo { @@ -394,244 +377,60 @@ impl MemPoolDB { }) } - fn walk( - &self, - chainstate: &mut StacksChainState, - tip_consensus_hash: &ConsensusHash, - tip_block_hash: &BlockHeaderHash, - tip_height: u64, - ) -> Result { - // Walk back to the next-highest - // ancestor of this tip, and see if we can include anything from there. - let next_height = match MemPoolDB::get_previous_block_height(&self.db, tip_height)? { - Some(next_height) => next_height, - None => { - debug!("Done scanning mempool: no transactions left"; "height" => tip_height); - return Ok(MemPoolWalkResult::Done); - } - }; - if next_height == 0 && tip_height == 0 { - // we're done -- tried every tx - debug!("Done scanning mempool: at height 0"); - return Ok(MemPoolWalkResult::Done); - } - - let next_tips = MemPoolDB::get_chain_tips_at_height(&self.db, next_height)?; - - let ancestor_tip = { - let headers_conn = chainstate.index_conn()?; - let index_block = - StacksBlockHeader::make_index_block_hash(tip_consensus_hash, tip_block_hash); - match StacksChainState::get_index_tip_ancestor_conn( - &headers_conn, - &index_block, - next_height, - )? { - Some(tip_info) => tip_info, - None => { - // no ancestor at the height, this is a error because this shouldn't ever - // happen: a chain tip should have an ancestor at every height < than its own height - error!( - "Done scanning mempool: no known ancestor of tip at height"; - "height" => next_height, - "tip_consensus_hash" => %tip_consensus_hash, - "tip_block_hash" => %tip_block_hash, - "tip_index_hash" => %StacksBlockHeader::make_index_block_hash( - tip_consensus_hash, - tip_block_hash - ), - ); - return Ok(MemPoolWalkResult::Done); - } - } - }; - - // find out which tip is the ancestor tip - let mut found = false; - let mut next_tip_consensus_hash = tip_consensus_hash.clone(); - let mut next_tip_block_hash = tip_block_hash.clone(); - - let ancestor_bh = ancestor_tip.anchored_header.block_hash(); - - for (consensus_hash, block_bhh) in next_tips.into_iter() { - if ancestor_tip.consensus_hash == consensus_hash && ancestor_bh == block_bhh { - found = true; - next_tip_consensus_hash = consensus_hash; - next_tip_block_hash = block_bhh; - break; - } - } - - if !found { - // no ancestor at height, try an earlier height - debug!( - "None of the available prior chain tips at {} is an ancestor of {}/{}", - next_height, tip_consensus_hash, tip_block_hash - ); - return Ok(MemPoolWalkResult::NoneAtHeight( - ancestor_tip.consensus_hash, - ancestor_bh, - tip_height - 1, - )); - } - - let next_timestamp = match MemPoolDB::get_next_timestamp( - &self.db, - &next_tip_consensus_hash, - &next_tip_block_hash, - 0, - )? { - Some(ts) => ts, - None => { - unreachable!("No transactions at a chain tip that exists"); - } - }; - - debug!( - "Will start scaning mempool at {}/{} height={} ts={}", - &next_tip_consensus_hash, &next_tip_block_hash, next_height, next_timestamp - ); - Ok(MemPoolWalkResult::Chainstate( - next_tip_consensus_hash, - next_tip_block_hash, - next_height, - next_timestamp, - )) - } - /// /// Iterate over candidates in the mempool /// todo will be called once for each bundle of transactions at - /// each ancestor chain tip from the given one, starting with the - /// most recent chain tip and working backwards until there are + /// each nonce, starting with the smallest nonce and going higher until there are /// no more transactions to consider. Each batch of transactions - /// passed to todo will be sorted in nonce order. - pub fn iterate_candidates( - &self, - tip_consensus_hash: &ConsensusHash, - tip_block_hash: &BlockHeaderHash, - tip_height: u64, - chainstate: &mut StacksChainState, - mut todo: F, - ) -> Result<(), E> + /// passed to todo will be sorted by sponsor_nonce. + /// + /// Consider transactions across all forks where the transactions have + /// height >= max(0, tip_height - MEMPOOL_MAX_TRANSACTION_AGE) and height <= tip_height. + pub fn iterate_candidates(&self, tip_height: u64, mut todo: F) -> Result<(), E> where F: FnMut(Vec) -> Result<(), E>, E: From + From, { - let (mut tip_consensus_hash, mut tip_block_hash, mut tip_height) = ( - tip_consensus_hash.clone(), - tip_block_hash.clone(), - tip_height, - ); + // Want to consider transactions with + // height > max(-1, tip_height - (MEMPOOL_MAX_TRANSACTION_AGE + 1)) + let min_height = tip_height.checked_sub(MEMPOOL_MAX_TRANSACTION_AGE + 1); + let mut curr_page = 0; - debug!( - "Begin scanning transaction mempool at {}/{} height={}", - &tip_consensus_hash, &tip_block_hash, tip_height - ); - - let mut next_timestamp = - match MemPoolDB::get_next_timestamp(&self.db, &tip_consensus_hash, &tip_block_hash, 0)? - { - Some(ts) => ts, - None => loop { - // walk back to where the first transaction we can mine can be found - match self.walk(chainstate, &tip_consensus_hash, &tip_block_hash, tip_height)? { - MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - ) => { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_bhh; - tip_height = next_height; - break next_timestamp; - } - MemPoolWalkResult::NoneAtHeight( - next_consensus_hash, - next_block_hash, - next_height, - ) => { - if std::env::var("STACKS_MEMPOOL_BAD_BEHAVIOR") == Ok("1".into()) { - warn!( - "Stopping mempool walk because no mempool entries at height = {}", - next_height - 1 - ); - return Ok(()); - } else { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_hash; - tip_height = next_height; - } - } - MemPoolWalkResult::Done => { - return Ok(()); - } - } - }, + let mut next_nonce = + match MemPoolDB::get_next_nonce(&self.db, min_height, tip_height, None)? { + None => { + return Ok(()); + } + Some(nonce) => nonce, }; loop { - let available_txs = MemPoolDB::get_txs_at( - &self.db, - &tip_consensus_hash, - &tip_block_hash, - next_timestamp, + let available_txs = MemPoolDB::get_txs_at_nonce_and_offset( + &self.db, min_height, tip_height, next_nonce, curr_page, )?; debug!( - "Have {} transactions at {}/{} height={} at or after {}", + "Have {} transactions at nonce={}", available_txs.len(), - &tip_consensus_hash, - &tip_block_hash, - tip_height, - next_timestamp + next_nonce, ); - todo(available_txs)?; - next_timestamp = match MemPoolDB::get_next_timestamp( - &self.db, - &tip_consensus_hash, - &tip_block_hash, - next_timestamp, - )? { - Some(ts) => ts, - None => loop { - // walk back - match self.walk(chainstate, &tip_consensus_hash, &tip_block_hash, tip_height)? { - MemPoolWalkResult::Chainstate( - next_consensus_hash, - next_block_bhh, - next_height, - next_timestamp, - ) => { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_bhh; - tip_height = next_height; - break next_timestamp; - } - MemPoolWalkResult::NoneAtHeight( - next_consensus_hash, - next_block_hash, - next_height, - ) => { - if std::env::var("STACKS_MEMPOOL_BAD_BEHAVIOR") == Ok("1".into()) { - warn!( - "Stopping mempool walk because no mempool entries at height = {}", - next_height - 1 - ); - return Ok(()); - } else { - tip_consensus_hash = next_consensus_hash; - tip_block_hash = next_block_hash; - tip_height = next_height; - } - } - MemPoolWalkResult::Done => { - return Ok(()); - } + if available_txs.len() > 0 { + todo(available_txs)?; + curr_page += 1; + } else { + curr_page = 0; + next_nonce = match MemPoolDB::get_next_nonce( + &self.db, + min_height, + tip_height, + Some(next_nonce), + )? { + None => { + return Ok(()); } - }, - }; + Some(nonce) => nonce, + }; + } } } @@ -669,18 +468,70 @@ impl MemPoolDB { Ok(rows) } - /// Get the next timestamp after this one that occurs in this chain tip. - pub fn get_next_timestamp( + /// Get all transactions at a specific block + #[cfg(test)] + pub fn get_num_tx_at_block( conn: &DBConn, consensus_hash: &ConsensusHash, block_header_hash: &BlockHeaderHash, - timestamp: u64, + ) -> Result { + let sql = "SELECT * FROM mempool WHERE consensus_hash = ?1 AND block_header_hash = ?2"; + let args: &[&dyn ToSql] = &[consensus_hash, block_header_hash]; + let rows = query_rows::(conn, &sql, args)?; + Ok(rows.len()) + } + + /// Get the next nonce/timestamp after this nonce that occurs in the height range. + pub fn get_next_nonce( + conn: &DBConn, + min_height: Option, + max_height: u64, + nonce: Option, ) -> Result, db_error> { - let sql = "SELECT accept_time FROM mempool WHERE accept_time > ?1 AND consensus_hash = ?2 AND block_header_hash = ?3 ORDER BY accept_time ASC LIMIT 1"; - let args: &[&dyn ToSql] = &[&u64_to_sql(timestamp)?, consensus_hash, block_header_hash]; + let nonce = match nonce { + None => -1, + Some(n) => u64_to_sql(n)?, + }; + let min_height_sql_arg = match min_height { + None => -1, + Some(h) => u64_to_sql(h)?, + }; + let sql = "SELECT origin_nonce FROM mempool WHERE origin_nonce > ?1 AND \ + height > ?2 AND height <= ?3 ORDER BY origin_nonce, accept_time ASC LIMIT 1"; + + let args: &[&dyn ToSql] = &[&nonce, &min_height_sql_arg, &u64_to_sql(max_height)?]; query_row(conn, sql, args) } + /// Get all transactions at a particular nonce and timestamp on a given chain tip. + /// Order them by sponsor nonce. + pub fn get_txs_at_nonce_and_offset( + conn: &DBConn, + min_height: Option, + max_height: u64, + nonce: u64, + curr_page: u32, + ) -> Result, db_error> { + let min_height_sql_arg = match min_height { + None => -1, + Some(h) => u64_to_sql(h)?, + }; + let limit = 200; + let offset = limit * curr_page; + + let sql = "SELECT * FROM mempool WHERE origin_nonce = ?1 AND \ + height > ?2 AND height <= ?3 ORDER BY sponsor_nonce ASC LIMIT ?4 OFFSET ?5"; + let args: &[&dyn ToSql] = &[ + &u64_to_sql(nonce)?, + &min_height_sql_arg, + &u64_to_sql(max_height)?, + &limit, + &offset, + ]; + let rows = query_rows::(conn, &sql, args)?; + Ok(rows) + } + /// Get all transactions at a particular timestamp on a given chain tip. /// Order them by origin nonce. pub fn get_txs_at( @@ -702,29 +553,6 @@ impl MemPoolDB { query_row(conn, sql, args) } - /// Get chain tip(s) at a given height that have transactions - pub fn get_chain_tips_at_height( - conn: &DBConn, - height: u64, - ) -> Result, db_error> { - let sql = "SELECT consensus_hash,block_header_hash FROM mempool WHERE height = ?1"; - let args: &[&dyn ToSql] = &[&u64_to_sql(height)?]; - - let mut stmt = conn.prepare(sql).map_err(db_error::SqliteError)?; - - let mut rows = stmt.query(args).map_err(db_error::SqliteError)?; - - // gather - let mut tips = vec![]; - while let Some(row) = rows.next().map_err(|e| db_error::SqliteError(e))? { - let consensus_hash = ConsensusHash::from_column(&row, "consensus_hash")?; - let block_hash = BlockHeaderHash::from_column(&row, "block_header_hash")?; - tips.push((consensus_hash, block_hash)); - } - - Ok(tips) - } - /// Get a number of transactions after a given timestamp on a given chain tip. pub fn get_txs_after( conn: &DBConn, @@ -798,6 +626,39 @@ impl MemPoolDB { Ok(cmp::max(as_origin, as_sponsor)) } + fn are_blocks_in_same_fork( + chainstate: &mut StacksChainState, + first_consensus_hash: &ConsensusHash, + first_stacks_block: &BlockHeaderHash, + second_consensus_hash: &ConsensusHash, + second_stacks_block: &BlockHeaderHash, + ) -> Result { + let first_block = + StacksBlockHeader::make_index_block_hash(first_consensus_hash, first_stacks_block); + let second_block = + StacksBlockHeader::make_index_block_hash(second_consensus_hash, second_stacks_block); + // short circuit equality + if second_block == first_block { + return Ok(true); + } + + let headers_conn = &chainstate + .index_conn() + .map_err(|_e| db_error::Other("ChainstateError".to_string()))?; + let height_of_first_with_second_tip = + headers_conn.get_ancestor_block_height(&second_block, &first_block)?; + let height_of_second_with_first_tip = + headers_conn.get_ancestor_block_height(&first_block, &second_block)?; + + match ( + height_of_first_with_second_tip, + height_of_second_with_first_tip, + ) { + (None, None) => Ok(false), + (_, _) => Ok(true), + } + } + /// Add a transaction to the mempool. If it already exists, then replace it if the given fee /// is higher than the one that's already there. /// Carry out the mempool admission test before adding. @@ -840,7 +701,7 @@ impl MemPoolDB { // is this a replace-by-fee ? replace_reason = MemPoolDropReason::REPLACE_BY_FEE; true - } else if !tx.is_block_in_fork( + } else if !MemPoolDB::are_blocks_in_same_fork( chainstate, &prior_tx.consensus_hash, &prior_tx.block_header_hash, @@ -1199,83 +1060,83 @@ mod tests { let _mempool = MemPoolDB::open(false, 0x80000000, &chainstate_path).unwrap(); } - #[test] - fn mempool_walk_over_fork() { - let mut chainstate = instantiate_chainstate_with_balances( - false, - 0x80000000, - "mempool_walk_over_fork", - vec![], - ); - - fn make_block( - chainstate: &mut StacksChainState, - block_consensus: ConsensusHash, - parent: &(ConsensusHash, BlockHeaderHash), - burn_height: u64, - block_height: u64, - ) -> (ConsensusHash, BlockHeaderHash) { - let (mut chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - - let anchored_header = StacksBlockHeader { - version: 1, - total_work: StacksWorkScore { - work: block_height, - burn: 1, - }, - proof: VRFProof::empty(), - parent_block: parent.1.clone(), - parent_microblock: BlockHeaderHash([0; 32]), - parent_microblock_sequence: 0, - tx_merkle_root: Sha512Trunc256Sum::empty(), - state_index_root: TrieHash::from_empty_data(), - microblock_pubkey_hash: Hash160([0; 20]), - }; + fn make_block( + chainstate: &mut StacksChainState, + block_consensus: ConsensusHash, + parent: &(ConsensusHash, BlockHeaderHash), + burn_height: u64, + block_height: u64, + ) -> (ConsensusHash, BlockHeaderHash) { + let (mut chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let block_hash = anchored_header.block_hash(); + let anchored_header = StacksBlockHeader { + version: 1, + total_work: StacksWorkScore { + work: block_height, + burn: 1, + }, + proof: VRFProof::empty(), + parent_block: parent.1.clone(), + parent_microblock: BlockHeaderHash([0; 32]), + parent_microblock_sequence: 0, + tx_merkle_root: Sha512Trunc256Sum::empty(), + state_index_root: TrieHash::from_empty_data(), + microblock_pubkey_hash: Hash160([0; 20]), + }; - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &parent.0, - &parent.1, - &block_consensus, - &block_hash, - ); + let block_hash = anchored_header.block_hash(); - let new_tip_info = StacksHeaderInfo { - anchored_header, - microblock_tail: None, - index_root: TrieHash::from_empty_data(), - block_height, - consensus_hash: block_consensus.clone(), - burn_header_hash: BurnchainHeaderHash([0; 32]), - burn_header_height: burn_height as u32, - burn_header_timestamp: 0, - anchored_block_size: 1, - }; + let c_tx = StacksChainState::chainstate_block_begin( + &chainstate_tx, + clar_tx, + &NULL_BURN_STATE_DB, + &parent.0, + &parent.1, + &block_consensus, + &block_hash, + ); - c_tx.commit_block(); + let new_tip_info = StacksHeaderInfo { + anchored_header, + microblock_tail: None, + index_root: TrieHash::from_empty_data(), + block_height, + consensus_hash: block_consensus.clone(), + burn_header_hash: BurnchainHeaderHash([0; 32]), + burn_header_height: burn_height as u32, + burn_header_timestamp: 0, + anchored_block_size: 1, + }; - let new_index_hash = StacksBlockId::new(&block_consensus, &block_hash); + c_tx.commit_block(); - chainstate_tx - .put_indexed_begin(&StacksBlockId::new(&parent.0, &parent.1), &new_index_hash) - .unwrap(); + let new_index_hash = StacksBlockId::new(&block_consensus, &block_hash); - StacksChainState::insert_stacks_block_header( - &mut chainstate_tx, - &new_index_hash, - &new_tip_info, - &ExecutionCost::zero(), - ) + chainstate_tx + .put_indexed_begin(&StacksBlockId::new(&parent.0, &parent.1), &new_index_hash) .unwrap(); - chainstate_tx.commit().unwrap(); + StacksChainState::insert_stacks_block_header( + &mut chainstate_tx, + &new_index_hash, + &new_tip_info, + &ExecutionCost::zero(), + ) + .unwrap(); + + chainstate_tx.commit().unwrap(); - (block_consensus, block_hash) - } + (block_consensus, block_hash) + } + + #[test] + fn mempool_walk_over_fork() { + let mut chainstate = instantiate_chainstate_with_balances( + false, + 0x80000000, + "mempool_walk_over_fork", + vec![], + ); // genesis -> b_1* -> b_2* // \-> b_3 -> b_4 @@ -1309,13 +1170,13 @@ mod tests { &TransactionPostConditionMode::Allow, ); - let blocks_to_broadcast_in = [&b_1, &b_2]; - let mut txs = [txs.pop().unwrap(), txs.pop().unwrap()]; + let blocks_to_broadcast_in = [&b_1, &b_2, &b_4]; + let mut txs = [txs.pop().unwrap(), txs.pop().unwrap(), txs.pop().unwrap()]; for tx in txs.iter_mut() { tx.set_tx_fee(123); } - for ix in 0..2 { + for ix in 0..3 { let mut mempool_tx = mempool.tx_begin().unwrap(); let block = &blocks_to_broadcast_in[ix]; @@ -1332,7 +1193,6 @@ mod tests { let txid = tx.txid(); let tx_bytes = tx.serialize_to_vec(); - let tx_fee = tx.get_tx_fee(); let height = 1 + ix as u64; @@ -1373,16 +1233,10 @@ mod tests { let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_2.0, - &b_2.1, - 2, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(2, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( count_txs, 2, @@ -1391,59 +1245,95 @@ mod tests { let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_5.0, - &b_5.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(3, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( - count_txs, 2, - "Mempool should find two transactions from b_5" + count_txs, 3, + "Mempool should find three transactions from b_5" ); let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_3.0, - &b_3.1, - 2, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(2, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( - count_txs, 1, - "Mempool should find one transactions from b_3" + count_txs, 2, + "Mempool should find two transactions from b_3" ); let mut count_txs = 0; mempool - .iterate_candidates::<_, ChainstateError>( - &b_4.0, - &b_4.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) + .iterate_candidates::<_, ChainstateError>(3, |available_txs| { + count_txs += available_txs.len(); + Ok(()) + }) .unwrap(); assert_eq!( - count_txs, 1, - "Mempool should find one transactions from b_4" + count_txs, 3, + "Mempool should find three transactions from b_4" ); // let's test replace-across-fork while we're here. + // first try to replace a tx in b_2 in b_1 - should fail because they are in the same fork + let mut mempool_tx = mempool.tx_begin().unwrap(); + let block = &b_1; + let tx = &txs[1]; + let origin_address = StacksAddress { + version: 22, + bytes: Hash160::from_data(&[0; 32]), + }; + let sponsor_address = StacksAddress { + version: 22, + bytes: Hash160::from_data(&[1; 32]), + }; + + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let tx_fee = tx.get_tx_fee(); + + let height = 3; + let origin_nonce = 1; + let sponsor_nonce = 1; + + // make sure that we already have the transaction we're testing for replace-across-fork + assert!(MemPoolDB::db_has_tx(&mempool_tx, &txid).unwrap()); + + assert!(MemPoolDB::try_add_tx( + &mut mempool_tx, + &mut chainstate, + &block.0, + &block.1, + txid, + tx_bytes, + tx_fee, + height, + &origin_address, + origin_nonce, + &sponsor_address, + sponsor_nonce, + None, + ) + .is_err()); + + assert!(MemPoolDB::db_has_tx(&mempool_tx, &txid).unwrap()); + mempool_tx.commit().unwrap(); + + // now try replace-across-fork from b_2 to b_4 + // check that the number of transactions at b_2 and b_4 starts at 1 each + assert_eq!( + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_4.0, &b_4.1).unwrap(), + 1 + ); + assert_eq!( + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_2.0, &b_2.1).unwrap(), + 1 + ); let mut mempool_tx = mempool.tx_begin().unwrap(); let block = &b_4; let tx = &txs[1]; @@ -1488,60 +1378,14 @@ mod tests { mempool_tx.commit().unwrap(); - // after replace-across-fork, tx[1] should have moved from the b_2->b_5 fork - // to b_4 - let mut count_txs = 0; - mempool - .iterate_candidates::<_, ChainstateError>( - &b_2.0, - &b_2.1, - 2, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) - .unwrap(); - assert_eq!( - count_txs, 1, - "After replace, mempool should find one transactions from b_2" - ); - - let mut count_txs = 0; - mempool - .iterate_candidates::<_, ChainstateError>( - &b_5.0, - &b_5.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) - .unwrap(); + // after replace-across-fork, tx[1] should have moved from the b_2->b_5 fork to b_4 assert_eq!( - count_txs, 1, - "After replace, mempool should find one transactions from b_5" + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_4.0, &b_4.1).unwrap(), + 2 ); - - let mut count_txs = 0; - mempool - .iterate_candidates::<_, ChainstateError>( - &b_4.0, - &b_4.1, - 3, - &mut chainstate, - |available_txs| { - count_txs += available_txs.len(); - Ok(()) - }, - ) - .unwrap(); assert_eq!( - count_txs, 2, - "After replace, mempool should find *two* transactions from b_4" + MemPoolDB::get_num_tx_at_block(&mempool.db, &b_2.0, &b_2.1).unwrap(), + 0 ); } @@ -1556,65 +1400,19 @@ mod tests { // genesis -> b_1 -> b_2 // \-> b_3 - - let b_1 = (ConsensusHash([0x1; 20]), BlockHeaderHash([0x4; 32])); - let b_2 = (ConsensusHash([0x2; 20]), BlockHeaderHash([0x5; 32])); - let b_3 = (ConsensusHash([0x3; 20]), BlockHeaderHash([0x6; 32])); - - eprintln!( - "b_1 => {}", - &StacksBlockHeader::make_index_block_hash(&b_1.0, &b_1.1) - ); - eprintln!( - "b_2 => {}", - &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1) - ); - eprintln!( - "b_3 => {}", - &StacksBlockHeader::make_index_block_hash(&b_3.0, &b_3.1) + // + let b_1 = make_block( + &mut chainstate, + ConsensusHash([0x1; 20]), + &( + FIRST_BURNCHAIN_CONSENSUS_HASH.clone(), + FIRST_STACKS_BLOCK_HASH.clone(), + ), + 1, + 1, ); - - { - let (chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &FIRST_BURNCHAIN_CONSENSUS_HASH, - &FIRST_STACKS_BLOCK_HASH, - &b_1.0, - &b_1.1, - ); - c_tx.commit_block(); - } - - { - let (chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &FIRST_BURNCHAIN_CONSENSUS_HASH, - &FIRST_STACKS_BLOCK_HASH, - &b_3.0, - &b_3.1, - ); - c_tx.commit_block(); - } - - { - let (chainstate_tx, clar_tx) = chainstate.chainstate_tx_begin().unwrap(); - let c_tx = StacksChainState::chainstate_block_begin( - &chainstate_tx, - clar_tx, - &NULL_BURN_STATE_DB, - &b_1.0, - &b_1.1, - &b_2.0, - &b_2.1, - ); - c_tx.commit_block(); - } + let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2); + let b_3 = make_block(&mut chainstate, ConsensusHash([0x3; 20]), &b_1, 1, 1); let chainstate_path = chainstate_path("mempool_do_not_replace_tx"); let mut mempool = MemPoolDB::open(false, 0x80000000, &chainstate_path).unwrap(); diff --git a/src/net/relay.rs b/src/net/relay.rs index c537e144a1..fffb1f3362 100644 --- a/src/net/relay.rs +++ b/src/net/relay.rs @@ -1053,7 +1053,7 @@ impl Relayer { // garbage-collect if chain_height > MEMPOOL_MAX_TRANSACTION_AGE { - let min_height = chain_height - MEMPOOL_MAX_TRANSACTION_AGE; + let min_height = chain_height.saturating_sub(MEMPOOL_MAX_TRANSACTION_AGE); let mut mempool_tx = mempool.tx_begin()?; debug!( diff --git a/testnet/stacks-node/src/tests/integrations.rs b/testnet/stacks-node/src/tests/integrations.rs index 7a725b57ff..5949246dda 100644 --- a/testnet/stacks-node/src/tests/integrations.rs +++ b/testnet/stacks-node/src/tests/integrations.rs @@ -1031,6 +1031,145 @@ fn contract_stx_transfer() { run_loop.start(num_rounds).unwrap(); } +#[test] +fn mine_transactions_out_of_order() { + let mut conf = super::new_test_conf(); + + let sk = StacksPrivateKey::from_hex(SK_3).unwrap(); + let addr = to_addr(&sk); + conf.burnchain.commit_anchor_block_within = 5000; + conf.add_initial_balance(addr.to_string(), 100000); + + let num_rounds = 5; + let mut run_loop = RunLoop::new(conf); + + run_loop + .callbacks + .on_new_tenure(|round, _burnchain_tip, chain_tip, tenure| { + let mut chainstate_copy = tenure.open_chainstate(); + + let sk = StacksPrivateKey::from_hex(SK_3).unwrap(); + let header_hash = chain_tip.block.block_hash(); + let consensus_hash = chain_tip.metadata.consensus_hash; + + let contract_identifier = QualifiedContractIdentifier::parse(&format!( + "{}.{}", + to_addr(&StacksPrivateKey::from_hex(SK_1).unwrap()).to_string(), + "faucet" + )) + .unwrap(); + + if round == 1 { + // block-height = 2 + let xfer_to_contract = + make_stacks_transfer(&sk, 1, 0, &contract_identifier.into(), 1000); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + xfer_to_contract, + ) + .unwrap(); + } else if round == 2 { + // block-height > 2 + let publish_tx = make_contract_publish(&sk, 2, 0, "faucet", FAUCET_CONTRACT); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + publish_tx, + ) + .unwrap(); + } else if round == 3 { + let xfer_to_contract = + make_stacks_transfer(&sk, 3, 0, &contract_identifier.into(), 1000); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + xfer_to_contract, + ) + .unwrap(); + } else if round == 4 { + let xfer_to_contract = + make_stacks_transfer(&sk, 0, 0, &contract_identifier.into(), 1000); + tenure + .mem_pool + .submit_raw( + &mut chainstate_copy, + &consensus_hash, + &header_hash, + xfer_to_contract, + ) + .unwrap(); + } + + return; + }); + + run_loop.callbacks.on_new_stacks_chain_state( + |round, _burnchain_tip, chain_tip, chain_state, burn_dbconn| { + let contract_identifier = QualifiedContractIdentifier::parse(&format!( + "{}.{}", + to_addr(&StacksPrivateKey::from_hex(SK_1).unwrap()).to_string(), + "faucet" + )) + .unwrap(); + + match round { + 1 => { + assert_eq!(chain_tip.metadata.block_height, 2); + assert_eq!(chain_tip.block.txs.len(), 1); + } + 2 => { + assert_eq!(chain_tip.metadata.block_height, 3); + assert_eq!(chain_tip.block.txs.len(), 1); + } + 3 => { + assert_eq!(chain_tip.metadata.block_height, 4); + assert_eq!(chain_tip.block.txs.len(), 1); + } + 4 => { + assert_eq!(chain_tip.metadata.block_height, 5); + assert_eq!(chain_tip.block.txs.len(), 5); + + // check that 1000 stx _was_ transfered to the contract principal + let curr_tip = ( + chain_tip.metadata.consensus_hash.clone(), + chain_tip.metadata.anchored_header.block_hash(), + ); + assert_eq!( + chain_state + .with_read_only_clarity_tx( + burn_dbconn, + &StacksBlockHeader::make_index_block_hash(&curr_tip.0, &curr_tip.1), + |conn| { + conn.with_clarity_db_readonly(|db| { + db.get_account_stx_balance( + &contract_identifier.clone().into(), + ) + .amount_unlocked + }) + } + ) + .unwrap(), + 3000 + ); + } + _ => {} + } + }, + ); + + run_loop.start(num_rounds).unwrap(); +} + /// Test mining a smart contract twice (in non-sequential blocks) /// this can happen in the testnet leader if they get "behind" /// the burnchain and a previously mined block doesn't get included