Skip to content

Commit

Permalink
Merge pull request #2514 from blockstack/fix/mempool-walk-order
Browse files Browse the repository at this point in the history
Fix/mempool walk order
  • Loading branch information
pavitthrap authored Apr 26, 2021
2 parents 1cdff84 + 22d4216 commit 148dbd6
Show file tree
Hide file tree
Showing 5 changed files with 532 additions and 625 deletions.
17 changes: 0 additions & 17 deletions src/chainstate/stacks/db/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,23 +279,6 @@ impl StacksChainState {
}
}

/// Get an ancestor block header given an index hash
pub fn get_index_tip_ancestor_conn(
conn: &StacksDBConn,
tip_index_hash: &StacksBlockId,
height: u64,
) -> Result<Option<StacksHeaderInfo>, Error> {
match conn
.get_ancestor_block_hash(height, tip_index_hash)
.map_err(Error::DBError)?
{
Some(bhh) => {
StacksChainState::get_stacks_block_header_info_by_index_block_hash(conn, &bhh)
}
None => Ok(None),
}
}

/// Get a segment of headers from the canonical chain
pub fn get_ancestors_headers(
conn: &Connection,
Expand Down
193 changes: 90 additions & 103 deletions src/chainstate/stacks/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,44 +437,38 @@ impl<'a> StacksMicroblockBuilder<'a> {
let mut bytes_so_far = self.runtime.bytes_so_far;
let mut num_txs = self.runtime.num_mined;

let result = mem_pool.iterate_candidates(
&self.anchor_block_consensus_hash,
&self.anchor_block,
self.anchor_block_height,
&mut self.header_reader,
|micro_txs| {
let mut result = Ok(());
for mempool_tx in micro_txs.into_iter() {
match StacksMicroblockBuilder::mine_next_transaction(
&mut clarity_tx,
mempool_tx.tx.clone(),
mempool_tx.metadata.len,
&mut considered,
bytes_so_far,
) {
Ok(true) => {
bytes_so_far += mempool_tx.metadata.len;

debug!(
"Include tx {} ({}) in microblock",
mempool_tx.tx.txid(),
mempool_tx.tx.payload.name()
);
txs_included.push(mempool_tx.tx);
num_txs += 1;
}
Ok(false) => {
continue;
}
Err(e) => {
result = Err(e);
break;
}
let result = mem_pool.iterate_candidates(self.anchor_block_height, |micro_txs| {
let mut result = Ok(());
for mempool_tx in micro_txs.into_iter() {
match StacksMicroblockBuilder::mine_next_transaction(
&mut clarity_tx,
mempool_tx.tx.clone(),
mempool_tx.metadata.len,
&mut considered,
bytes_so_far,
) {
Ok(true) => {
bytes_so_far += mempool_tx.metadata.len;

debug!(
"Include tx {} ({}) in microblock",
mempool_tx.tx.txid(),
mempool_tx.tx.payload.name()
);
txs_included.push(mempool_tx.tx);
num_txs += 1;
}
Ok(false) => {
continue;
}
Err(e) => {
result = Err(e);
break;
}
}
result
},
);
}
result
});

// do fault injection
if self.runtime.disable_bytes_check {
Expand Down Expand Up @@ -1389,7 +1383,6 @@ impl StacksBlockBuilder {
&tip_consensus_hash, &tip_block_hash, tip_height, execution_budget
);

let (mut header_reader_chainstate, _) = chainstate_handle.reopen()?; // used for reading block headers during an epoch
let (mut chainstate, _) = chainstate_handle.reopen_limited(execution_budget)?; // used for processing a block up to the given limit

let mut builder = StacksBlockBuilder::make_block_builder(
Expand All @@ -1413,86 +1406,80 @@ impl StacksBlockBuilder {

let mut block_limit_hit = BlockLimitFunction::NO_LIMIT_HIT;

let result = mempool.iterate_candidates(
&tip_consensus_hash,
&tip_block_hash,
tip_height,
&mut header_reader_chainstate,
|available_txs| {
if block_limit_hit == BlockLimitFunction::LIMIT_REACHED {
return Ok(());
}
let result = mempool.iterate_candidates(tip_height, |available_txs| {
if block_limit_hit == BlockLimitFunction::LIMIT_REACHED {
return Ok(());
}

for txinfo in available_txs.into_iter() {
// skip transactions early if we can
if considered.contains(&txinfo.tx.txid()) {
for txinfo in available_txs.into_iter() {
// skip transactions early if we can
if considered.contains(&txinfo.tx.txid()) {
continue;
}
if let Some(nonce) = mined_origin_nonces.get(&txinfo.tx.origin_address()) {
if *nonce >= txinfo.tx.get_origin_nonce() {
continue;
}
if let Some(nonce) = mined_origin_nonces.get(&txinfo.tx.origin_address()) {
if *nonce >= txinfo.tx.get_origin_nonce() {
continue;
}
}
if let Some(sponsor_addr) = txinfo.tx.sponsor_address() {
if let Some(nonce) = mined_sponsor_nonces.get(&sponsor_addr) {
if let Some(sponsor_nonce) = txinfo.tx.get_sponsor_nonce() {
if *nonce >= sponsor_nonce {
continue;
}
}
if let Some(sponsor_addr) = txinfo.tx.sponsor_address() {
if let Some(nonce) = mined_sponsor_nonces.get(&sponsor_addr) {
if let Some(sponsor_nonce) = txinfo.tx.get_sponsor_nonce() {
if *nonce >= sponsor_nonce {
continue;
}
}
}
}

considered.insert(txinfo.tx.txid());

match builder.try_mine_tx_with_len(
&mut epoch_tx,
&txinfo.tx,
txinfo.metadata.len,
&block_limit_hit,
) {
Ok(_) => {}
Err(Error::BlockTooBigError) => {
// done mining -- our execution budget is exceeded.
// Make the block from the transactions we did manage to get
debug!("Block budget exceeded on tx {}", &txinfo.tx.txid());
if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT;
continue;
} else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::LIMIT_REACHED;
}
}
Err(Error::TransactionTooBigError) => {
invalidated_txs.push(txinfo.metadata.txid);
if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT;
continue;
} else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::LIMIT_REACHED;
}
}
Err(Error::InvalidStacksTransaction(_, true)) => {
// if we have an invalid transaction that was quietly ignored, don't warn here either
considered.insert(txinfo.tx.txid());

match builder.try_mine_tx_with_len(
&mut epoch_tx,
&txinfo.tx,
txinfo.metadata.len,
&block_limit_hit,
) {
Ok(_) => {}
Err(Error::BlockTooBigError) => {
// done mining -- our execution budget is exceeded.
// Make the block from the transactions we did manage to get
debug!("Block budget exceeded on tx {}", &txinfo.tx.txid());
if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT;
continue;
} else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::LIMIT_REACHED;
}
Err(e) => {
warn!("Failed to apply tx {}: {:?}", &txinfo.tx.txid(), &e);
}
Err(Error::TransactionTooBigError) => {
invalidated_txs.push(txinfo.metadata.txid);
if block_limit_hit == BlockLimitFunction::NO_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::CONTRACT_LIMIT_HIT;
continue;
} else if block_limit_hit == BlockLimitFunction::CONTRACT_LIMIT_HIT {
block_limit_hit = BlockLimitFunction::LIMIT_REACHED;
}
}

mined_origin_nonces
.insert(txinfo.tx.origin_address(), txinfo.tx.get_origin_nonce());
if let (Some(sponsor_addr), Some(sponsor_nonce)) =
(txinfo.tx.sponsor_address(), txinfo.tx.get_sponsor_nonce())
{
mined_sponsor_nonces.insert(sponsor_addr, sponsor_nonce);
Err(Error::InvalidStacksTransaction(_, true)) => {
// if we have an invalid transaction that was quietly ignored, don't warn here either
continue;
}
Err(e) => {
warn!("Failed to apply tx {}: {:?}", &txinfo.tx.txid(), &e);
continue;
}
}
Ok(())
},
);

mined_origin_nonces
.insert(txinfo.tx.origin_address(), txinfo.tx.get_origin_nonce());
if let (Some(sponsor_addr), Some(sponsor_nonce)) =
(txinfo.tx.sponsor_address(), txinfo.tx.get_sponsor_nonce())
{
mined_sponsor_nonces.insert(sponsor_addr, sponsor_nonce);
}
}
Ok(())
});

mempool.drop_txs(&invalidated_txs)?;
if let Some(observer) = event_observer {
Expand Down
Loading

0 comments on commit 148dbd6

Please sign in to comment.