Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up and restructure of pool unit-tests #3507

Merged
merged 6 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 156 additions & 176 deletions massa-pool-worker/src/tests/operation_pool_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,19 @@
//! latest period given his own thread. All operation which doesn't fit these
//! requirements are "irrelevant"
//!
use super::tools::{create_some_operations, operation_pool_test};
use crate::operation_pool::OperationPool;
use massa_execution_exports::test_exports::{
MockExecutionController, MockExecutionControllerMessage,
};
use massa_models::{
address::Address,
amount::Amount,
operation::{Operation, OperationSerializer, OperationType, SecureShareOperation},
prehash::PreHashMap,
secure_share::SecureShareContent,
slot::Slot,
};
use massa_pool_exports::{PoolChannels, PoolConfig};
use massa_signature::KeyPair;
use massa_storage::Storage;
use std::{str::FromStr, time::Duration};
use tokio::sync::broadcast;
use crate::tests::tools::OpGenerator;

use super::tools::{create_some_operations, operation_pool_test, pool_test};
use massa_execution_exports::test_exports::MockExecutionControllerMessage;
use massa_models::{amount::Amount, slot::Slot};
use massa_pool_exports::PoolConfig;
use std::time::Duration;

#[test]
fn test_add_operation() {
operation_pool_test(PoolConfig::default(), |mut operation_pool, mut storage| {
storage.store_operations(create_some_operations(10, &KeyPair::generate(), 2));
let op_gen = OpGenerator::default().expirery(2);
storage.store_operations(create_some_operations(10, &op_gen));
operation_pool.add_operations(storage);
assert_eq!(operation_pool.storage.get_op_refs().len(), 10);
});
Expand All @@ -52,174 +42,164 @@ fn test_add_irrelevant_operation() {
let pool_config = PoolConfig::default();
let thread_count = pool_config.thread_count;
operation_pool_test(PoolConfig::default(), |mut operation_pool, mut storage| {
storage.store_operations(create_some_operations(10, &KeyPair::generate(), 1));
let op_gen = OpGenerator::default().expirery(2);
storage.store_operations(create_some_operations(10, &op_gen));
operation_pool.notify_final_cs_periods(&vec![51; thread_count.into()]);
operation_pool.add_operations(storage);
assert_eq!(operation_pool.storage.get_op_refs().len(), 0);
});
}

fn get_transaction(expire_period: u64, fee: u64) -> SecureShareOperation {
let sender_keypair = KeyPair::generate();

let recv_keypair = KeyPair::generate();

let op = OperationType::Transaction {
recipient_address: Address::from_public_key(&recv_keypair.get_public_key()),
amount: Amount::default(),
};
let content = Operation {
fee: Amount::from_str(&fee.to_string()).unwrap(),
op,
expire_period,
};
Operation::new_verifiable(content, OperationSerializer::new(), &sender_keypair).unwrap()
}

/// TODO refactor old tests
#[test]
fn test_pool() {
let (execution_controller, execution_receiver) = MockExecutionController::new_with_receiver();
let pool_config = PoolConfig::default();
let storage_base = Storage::create_root();
let operation_sender = broadcast::channel(pool_config.broadcast_operations_capacity).0;
let mut pool = OperationPool::init(
pool_test(
pool_config,
&storage_base,
execution_controller,
PoolChannels { operation_sender },
);
// generate (id, transactions, range of validity) by threads
let mut thread_tx_lists = vec![Vec::new(); pool_config.thread_count as usize];
for i in 0..18 {
let fee = 40 + i;
let expire_period: u64 = 40 + i;
let start_period = expire_period.saturating_sub(pool_config.operation_validity_periods);
let op = get_transaction(expire_period, fee);
let id = op.id;

let mut ops = PreHashMap::default();
ops.insert(id, op.clone());
let mut storage = storage_base.clone_without_refs();
storage.store_operations(ops.values().cloned().collect());
pool.add_operations(storage);
//TODO: compare
// assert_eq!(storage.get_op_refs(), &Set::<OperationId>::default());

// duplicate
let mut storage = storage_base.clone_without_refs();
storage.store_operations(ops.values().cloned().collect());
pool.add_operations(storage);
//TODO: compare
//assert_eq!(storage.get_op_refs(), &ops.keys().copied().collect::<Set<OperationId>>());

let op_thread = op
.content_creator_address
.get_thread(pool_config.thread_count);
thread_tx_lists[op_thread as usize].push((op, start_period..=expire_period));
}
std::thread::spawn(move || loop {
match execution_receiver.recv_timeout(Duration::from_millis(100)) {
// forward on the operations
Ok(MockExecutionControllerMessage::UnexecutedOpsAmong {
ops, response_tx, ..
}) => {
response_tx.send(ops).unwrap();
|mut pool_manager, mut pool, execution_receiver, storage_base| {
// generate (id, transactions, range of validity) by threads
let mut thread_tx_lists = vec![Vec::new(); pool_config.thread_count as usize];

for i in 0..18 {
let expire_period: u64 = 40 + i;
let op = OpGenerator::default()
Comment on lines +63 to +65
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this code could be handled by a helper method, as similar work is done in the other test functions. This is not so straight forward, as there are subtle differences between the tests that would make this doable, but in my view "clever for the sake of clever", and not of significant benefit in this context.

.expirery(expire_period)
.fee(Amount::from_raw(40 + i))
.generate(); //get_transaction(expire_period, fee);

let mut storage = storage_base.clone_without_refs();
storage.store_operations(vec![op.clone()]);
pool.add_operations(storage);

//TODO: compare
// assert_eq!(storage.get_op_refs(), &Set::<OperationId>::default());

// duplicate
// let mut storage = storage_base.clone_without_refs();
// storage.store_operations(vec![op.clone()]);
// pool.add_operations(storage);
//TODO: compare
//assert_eq!(storage.get_op_refs(), &ops.keys().copied().collect::<Set<OperationId>>());

let op_thread = op
.content_creator_address
.get_thread(pool_config.thread_count);

let start_period =
expire_period.saturating_sub(pool_config.operation_validity_periods);

thread_tx_lists[op_thread as usize].push((op, start_period..=expire_period));
}
// we want the operations to be paid for...
Ok(MockExecutionControllerMessage::GetFinalAndCandidateBalance {
response_tx, ..
}) => response_tx
.send(vec![(
Some(Amount::from_raw(60 * 1000000000)),
Some(Amount::from_raw(60 * 1000000000)),
)])
.unwrap(),
_ => {}
}
});
std::thread::spawn(move || loop {
match execution_receiver.recv_timeout(Duration::from_millis(100)) {
// forward on the operations
Ok(MockExecutionControllerMessage::UnexecutedOpsAmong {
ops,
response_tx,
..
}) => {
response_tx.send(ops).unwrap();
}
// we want the operations to be paid for...
Ok(MockExecutionControllerMessage::GetFinalAndCandidateBalance {
response_tx,
..
}) => response_tx
.send(vec![(
Some(Amount::from_raw(60 * 1_000_000_000)),
Some(Amount::from_raw(60 * 1_000_000_000)),
)])
.unwrap(),
_ => {}
}
});

// sort from bigger fee to smaller and truncate
for lst in thread_tx_lists.iter_mut() {
lst.reverse();
lst.truncate(pool_config.max_operation_pool_size_per_thread);
}

// checks ops are the expected ones for thread 0 and 1 and various periods
for thread in 0u8..pool_config.thread_count {
for period in 0u64..70 {
let target_slot = Slot::new(period, thread);
let max_count = 3;
let (ids, storage) = pool.get_block_operations(&target_slot);

assert!(ids
.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.eq(thread_tx_lists[target_slot.thread as usize]
.iter()
.filter(|(_, r)| r.contains(&target_slot.period))
.take(max_count)
.map(|(op, _)| (op.id, op.serialized_data.clone()))));
}
}

// op ending before or at period 45 won't appear in the block due to incompatible validity range
// we don't keep them as expected ops
let final_period = 45u64;
pool.notify_final_cs_periods(&vec![final_period; pool_config.thread_count as usize]);
for lst in thread_tx_lists.iter_mut() {
lst.retain(|(op, _)| op.content.expire_period > final_period);
}

// checks ops are the expected ones for thread 0 and 1 and various periods
for thread in 0u8..pool_config.thread_count {
for period in 0u64..70 {
let target_slot = Slot::new(period, thread);
let max_count = 4;
let (ids, storage) = pool.get_block_operations(&target_slot);
assert!(ids
.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.eq(thread_tx_lists[target_slot.thread as usize]
.iter()
.filter(|(_, r)| r.contains(&target_slot.period))
.take(max_count)
.map(|(op, _)| (op.id, op.serialized_data.clone()))));
}
}

// add transactions with a high fee but too much in the future: should be ignored
{
//TODO: update current slot
//pool.update_current_slot(Slot::new(10, 0));
let fee = 1000;
let expire_period: u64 = 300;
let op = get_transaction(expire_period, fee);
let mut storage = storage_base.clone_without_refs();
storage.store_operations(vec![op.clone()]);
pool.add_operations(storage);
//TODO: compare
//assert_eq!(storage.get_op_refs(), &Set::<OperationId>::default());
let op_thread = op
.content_creator_address
.get_thread(pool_config.thread_count);
let (ids, _) = pool.get_block_operations(&Slot::new(expire_period - 1, op_thread));
assert!(ids.is_empty());
}
// sort from bigger fee to smaller and truncate
for lst in thread_tx_lists.iter_mut() {
lst.reverse();
lst.truncate(pool_config.max_operation_pool_size_per_thread);
}

// checks ops are the expected ones for thread 0 and 1 and various periods
for thread in 0u8..pool_config.thread_count {
for period in 0u64..70 {
let target_slot = Slot::new(period, thread);
let max_count = 3;
let (ids, storage) = pool.get_block_operations(&target_slot);

assert!(ids
.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.eq(thread_tx_lists[target_slot.thread as usize]
.iter()
.filter(|(_, r)| r.contains(&target_slot.period))
.take(max_count)
.map(|(op, _)| (op.id, op.serialized_data.clone()))));
}
}

// op ending before or at period 45 won't appear in the block due to incompatible validity range
// we don't keep them as expected ops
let final_period = 45u64;
pool.notify_final_cs_periods(&vec![final_period; pool_config.thread_count as usize]);
for lst in thread_tx_lists.iter_mut() {
lst.retain(|(op, _)| op.content.expire_period > final_period);
}

// checks ops are the expected ones for thread 0 and 1 and various periods
for thread in 0u8..pool_config.thread_count {
for period in 0u64..70 {
let target_slot = Slot::new(period, thread);
let max_count = 4;
let (ids, storage) = pool.get_block_operations(&target_slot);
assert!(ids
.iter()
.map(|id| (
*id,
storage
.read_operations()
.get(id)
.unwrap()
.serialized_data
.clone()
))
.eq(thread_tx_lists[target_slot.thread as usize]
.iter()
.filter(|(_, r)| r.contains(&target_slot.period))
.take(max_count)
.map(|(op, _)| (op.id, op.serialized_data.clone()))));
}
}

// add transactions with a high fee but too much in the future: should be ignored
{
//TODO: update current slot
//pool.update_current_slot(Slot::new(10, 0));
let expire_period: u64 = 300;
let op = OpGenerator::default()
.expirery(expire_period)
.fee(Amount::from_raw(1000))
.generate();
let mut storage = storage_base.clone_without_refs();
storage.store_operations(vec![op.clone()]);
pool.add_operations(storage);
//TODO: compare
//assert_eq!(storage.get_op_refs(), &Set::<OperationId>::default());
let op_thread = op
.content_creator_address
.get_thread(pool_config.thread_count);
let (ids, _) = pool.get_block_operations(&Slot::new(expire_period - 1, op_thread));
assert!(ids.is_empty());
}
pool_manager.stop();
},
);
}
Loading