Skip to content

Commit

Permalink
add file and db backend for dump-block feature (#4693)
Browse files Browse the repository at this point in the history
Signed-off-by: Jean-François <jfm@laposte.net>
  • Loading branch information
bilboquet authored May 16, 2024
1 parent 6f9acc3 commit a949f65
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 36 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async-trait = "0.1"
bitvec = "1.0"
blake3 = "=1.5"
bs58 = "=0.5"
cfg-if = "1.0.0"
clap = { version = "4.4", features = ["derive", "cargo"] }
config = "0.13"
console = "0.15"
Expand Down
25 changes: 13 additions & 12 deletions massa-execution-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ gas_calibration = [
"massa_final_state/test-exports",
"massa_pos_worker",
"massa_db_worker",
"tempfile"
"tempfile",
]
test-exports = [
"massa_execution_exports/test-exports",
Expand All @@ -29,26 +29,26 @@ test-exports = [
"massa_metrics/test-exports",
"massa_metrics/test-exports",
"massa_db_worker",
"tempfile"
"tempfile",
]
benchmarking = [
"massa-sc-runtime/gas_calibration",
"criterion",
"massa_pos_worker",
"massa_db_worker",
"tempfile"
"tempfile",
]
metrics = []
execution-trace = [
"massa_execution_exports/execution-trace",
]
execution-trace = ["massa_execution_exports/execution-trace"]
dump-block = [
"prost",
"massa_execution_exports/dump-block"
]
execution-info = [
"execution-trace"
"massa_execution_exports/dump-block",
"db_storage_backend",
]
db_storage_backend = []
file_storage_backend = []

execution-info = ["execution-trace"]

[dependencies]
anyhow = { workspace = true }
Expand Down Expand Up @@ -90,7 +90,8 @@ massa_wallet = { workspace = true }
massa-proto-rs = { workspace = true }
schnellru = { workspace = true }
prost = { version = "=0.12", optional = true }
cfg-if = "1.0.0"
cfg-if = { workspace = true }
rocksdb = { workspace = true }

[dev-dependencies]
massa_storage = { workspace = true }
Expand All @@ -104,7 +105,7 @@ massa_wallet = { workspace = true, features = ["test-exports"] }
massa_metrics = { workspace = true, features = ["test-exports"] }
massa_db_worker = { workspace = true }
tempfile = { workspace = true }
massa_test_framework = {workspace = true, "features" = ["test-exports"]}
massa_test_framework = { workspace = true, "features" = ["test-exports"] }
tokio = { workspace = true, features = ["sync"] }
hex-literal = { workspace = true }
mockall = { workspace = true }
25 changes: 12 additions & 13 deletions massa-execution-worker/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::active_history::{ActiveHistory, HistorySearchResult};
use crate::context::{ExecutionContext, ExecutionContextSnapshot};
use crate::interface_impl::InterfaceImpl;
use crate::stats::ExecutionStatsCounter;
#[cfg(feature = "dump-block")]
use crate::storage_backend::StorageBackend;
use massa_async_pool::AsyncMessage;
use massa_execution_exports::{
EventStore, ExecutedBlockInfo, ExecutionBlockMetadata, ExecutionChannels, ExecutionConfig,
Expand Down Expand Up @@ -70,8 +72,6 @@ use massa_models::secure_share::SecureShare;
use massa_proto_rs::massa::model::v1 as grpc_model;
#[cfg(feature = "dump-block")]
use prost::Message;
#[cfg(feature = "dump-block")]
use std::io::Write;

/// Used to acquire a lock on the execution context
macro_rules! context_guard {
Expand Down Expand Up @@ -134,6 +134,8 @@ pub(crate) struct ExecutionState {
pub(crate) trace_history: Arc<RwLock<TraceHistory>>,
#[cfg(feature = "execution-info")]
pub(crate) execution_info: Arc<RwLock<ExecutionInfo>>,
#[cfg(feature = "dump-block")]
block_storage_backend: Arc<RwLock<dyn StorageBackend>>,
}

impl ExecutionState {
Expand All @@ -145,6 +147,7 @@ impl ExecutionState {
///
/// # returns
/// A new `ExecutionState`
#[allow(clippy::too_many_arguments)]
pub fn new(
config: ExecutionConfig,
final_state: Arc<RwLock<dyn FinalStateController>>,
Expand All @@ -153,6 +156,7 @@ impl ExecutionState {
channels: ExecutionChannels,
wallet: Arc<RwLock<Wallet>>,
massa_metrics: MassaMetrics,
#[cfg(feature = "dump-block")] block_storage_backend: Arc<RwLock<dyn StorageBackend>>,
) -> ExecutionState {
// Get the slot at the output of which the final state is attached.
// This should be among the latest final slots.
Expand Down Expand Up @@ -225,6 +229,8 @@ impl ExecutionState {
config.max_execution_traces_slot_limit as u32,
))),
config,
#[cfg(feature = "dump-block")]
block_storage_backend,
}
}

Expand Down Expand Up @@ -349,15 +355,6 @@ impl ExecutionState {

#[cfg(feature = "dump-block")]
{
let block_folder = &self.config.block_dump_folder_path;
let block_file_path = block_folder.join(format!(
"block_slot_{}_{}.bin",
exec_out.slot.thread, exec_out.slot.period
));

let mut fs = std::fs::File::create(block_file_path.clone())
.unwrap_or_else(|_| panic!("Cannot create file: {:?}", block_file_path));

let mut block_ser = vec![];
if let Some(block_info) = exec_out.block_info {
let block_id = block_info.block_id;
Expand Down Expand Up @@ -390,8 +387,10 @@ impl ExecutionState {
let grpc_filled_block = grpc_model::FilledBlock::from(filled_block);
grpc_filled_block.encode(&mut block_ser).unwrap();
}
fs.write_all(&block_ser[..])
.expect("Unable to write block to disk");

self.block_storage_backend
.write()
.write(&exec_out.slot, &block_ser);
}
}

Expand Down
3 changes: 3 additions & 0 deletions massa-execution-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ mod speculative_executed_ops;
mod speculative_ledger;
mod speculative_roll_state;
mod stats;
/// Provide abstraction and implementations of a storage backend for the the
/// dump-block feature
pub mod storage_backend;
mod worker;

#[cfg(feature = "execution-trace")]
Expand Down
135 changes: 135 additions & 0 deletions massa-execution-worker/src/storage_backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::{
fs::File,
io::{Read, Write},
path::PathBuf,
};

use massa_models::slot::Slot;
use rocksdb::{DBCompressionType, Options};

/// A trait that defines the interface for a storage backend for the dump-block
/// feature.
pub trait StorageBackend: Send + Sync {
/// Writes the given value to the storage backend.
/// The slot is used as the key to the value.
fn write(&self, slot: &Slot, value: &[u8]);

/// Reads the value from the storage backend.
/// The slot is used as the key to the value.
fn read(&self, slot: &Slot) -> Option<Vec<u8>>;
}

/// A storage backend that uses the file system as the underlying storage engine.
pub struct FileStorageBackend {
folder: PathBuf,
}
impl FileStorageBackend {
/// Creates a new instance of `FileStorageBackend` with the given path.
pub fn new(path: PathBuf) -> Self {
Self { folder: path }
}
}

impl StorageBackend for FileStorageBackend {
fn write(&self, slot: &Slot, value: &[u8]) {
let block_file_path = self
.folder
.join(format!("block_slot_{}_{}.bin", slot.thread, slot.period));

let mut file = File::create(block_file_path.clone())
.unwrap_or_else(|_| panic!("Cannot create file: {:?}", block_file_path));

file.write_all(value).expect("Unable to write to disk");
}

fn read(&self, slot: &Slot) -> Option<Vec<u8>> {
let block_file_path = self
.folder
.join(format!("block_slot_{}_{}.bin", slot.thread, slot.period));

let file = File::open(block_file_path.clone())
.unwrap_or_else(|_| panic!("Cannot open file: {:?}", block_file_path));
let mut reader = std::io::BufReader::new(file);
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.expect("Unable to read from disk");

Some(buffer)
}
}

/// A storage backend that uses RocksDB as the underlying storage engine.
pub struct RocksDBStorageBackend {
db: rocksdb::DB,
}

impl RocksDBStorageBackend {
/// Creates a new instance of `RocksDBStorageBackend` with the given path.
pub fn new(path: PathBuf) -> Self {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compression_type(DBCompressionType::Lz4);

let db = rocksdb::DB::open(&opts, path.clone())
.unwrap_or_else(|_| panic!("Failed to create storage db at {:?}", path));

Self { db }
}
}

impl StorageBackend for RocksDBStorageBackend {
fn write(&self, slot: &Slot, value: &[u8]) {
self.db
.put(slot.to_bytes_key(), value)
.expect("Unable to write block to db");
}

fn read(&self, slot: &Slot) -> Option<Vec<u8>> {
match self.db.get(slot.to_bytes_key()) {
Ok(val) => val,
Err(e) => {
println!("Error: {} reading key {:?}", e, slot.to_bytes_key());
None
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_file_storage_backend() {
let slot = Slot {
thread: 1,
period: 1,
};
let value = vec![1, 2, 3];

let storage = FileStorageBackend::new(PathBuf::from(""));
storage.write(&slot, &value);

let storage = FileStorageBackend::new(PathBuf::from(""));
let data = storage.read(&slot);
assert_eq!(data, Some(value));
}

#[test]
fn test_rocksdb_storage_backend() {
let slot = Slot {
thread: 1,
period: 1,
};
let value = vec![1, 2, 3];

let storage = RocksDBStorageBackend::new(PathBuf::from("test_db"));
storage.write(&slot, &value);
drop(storage);

let storage = RocksDBStorageBackend::new(PathBuf::from("test_db"));
let data = storage.read(&slot);
assert_eq!(data, Some(value));
}
}
19 changes: 14 additions & 5 deletions massa-execution-worker/src/tests/scenarios_mandatories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2852,6 +2852,8 @@ fn execution_trace_nested() {
#[cfg(feature = "dump-block")]
#[test]
fn test_dump_block() {
use crate::storage_backend::StorageBackend;

// setup the period duration
let exec_cfg = ExecutionConfig::default();
let mut foreign_controllers = ExecutionForeignControllers::new_with_mocks();
Expand Down Expand Up @@ -2931,13 +2933,20 @@ fn test_dump_block() {

std::thread::sleep(Duration::from_secs(1));

// if the the storage backend for the dump-block feature is a rocksdb, this
// is mandatory (the db must be closed before we can reopen it to ckeck the
// data)
drop(universe);

let block_folder = &exec_cfg.block_dump_folder_path;
let block_file_path = block_folder.join(format!(
"block_slot_{}_{}.bin",
block_slot.thread, block_slot.period
));
#[cfg(feature = "file_storage_backend")]
let storage_backend = crate::storage_backend::FileStorageBackend::new(block_folder.to_owned());

#[cfg(feature = "db_storage_backend")]
let storage_backend =
crate::storage_backend::RocksDBStorageBackend::new(block_folder.to_owned());

let block_content = std::fs::read(block_file_path).expect("Unable to read block dump");
let block_content = storage_backend.read(&block_slot).unwrap();
let filled_block = FilledBlock::decode(&mut Cursor::new(block_content)).unwrap();
let header_content = filled_block.header.unwrap().content.unwrap();
let header_slot = header_content.slot.unwrap();
Expand Down
12 changes: 12 additions & 0 deletions massa-execution-worker/src/tests/universe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::{
sync::Arc,
};

#[cfg(feature = "file_storage_backend")]
use crate::storage_backend::FileStorageBackend;
#[cfg(feature = "db_storage_backend")]
use crate::storage_backend::RocksDBStorageBackend;
use massa_db_exports::{MassaDBConfig, MassaDBController, ShareableMassaDBController};
use massa_db_worker::MassaDB;
use massa_execution_exports::{
Expand Down Expand Up @@ -117,6 +121,14 @@ impl TestUniverse for ExecutionTestUniverse {
std::time::Duration::from_secs(5),
)
.0,
#[cfg(feature = "file_storage_backend")]
Arc::new(RwLock::new(FileStorageBackend::new(
config.block_dump_folder_path.clone(),
))),
#[cfg(feature = "db_storage_backend")]
Arc::new(RwLock::new(RocksDBStorageBackend::new(
config.block_dump_folder_path.clone(),
))),
);
init_execution_worker(&config, &storage, module_controller.clone());
let universe = Self {
Expand Down
Loading

0 comments on commit a949f65

Please sign in to comment.