From f2b4184ff0835e87eb4d02772c82e2bded19e435 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 18 Feb 2022 13:44:30 +1000 Subject: [PATCH] refactor(state): add a DiskWriteBatch wrapper for RocksDB writes --- zebra-state/src/service/finalized_state.rs | 15 +++++----- .../src/service/finalized_state/disk_db.rs | 29 +++++++++++++------ 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index f82d48c643c..2150820487d 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -30,8 +30,8 @@ use crate::{ service::{ check, finalized_state::{ - disk_db::{DiskDb, ReadDisk, WriteDisk}, - disk_format::{FromDisk, IntoDisk, TransactionLocation}, + disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, + disk_format::{FromDisk, TransactionLocation}, }, QueuedFinalized, }, @@ -299,8 +299,8 @@ impl FinalizedState { // the genesis case. // If the closure returns an error it will be propagated and the batch will not be written // to the BD afterwards. - let prepare_commit = || -> Result { - let mut batch = rocksdb::WriteBatch::default(); + let prepare_commit = || -> Result { + let mut batch = DiskWriteBatch::new(); // Index the block batch.zs_insert(hash_by_height, height, hash); @@ -366,7 +366,7 @@ impl FinalizedState { if let Some(utxo) = self.utxo(outpoint) { all_utxos_spent_by_block.insert(*outpoint, utxo); } - batch.delete_cf(utxo_by_outpoint, outpoint.as_bytes()); + batch.zs_delete(utxo_by_outpoint, outpoint); } // Coinbase inputs represent new coins, // so there are no UTXOs to mark as spent. @@ -726,8 +726,9 @@ impl FinalizedState { #[cfg(any(test, feature = "proptest-impl"))] #[allow(dead_code)] pub fn set_current_value_pool(&self, fake_value_pool: ValueBalance) { - let mut batch = rocksdb::WriteBatch::default(); + let mut batch = DiskWriteBatch::new(); let value_pool_cf = self.db.cf_handle("tip_chain_value_pool").unwrap(); + batch.zs_insert(value_pool_cf, (), fake_value_pool); self.db.write(batch).unwrap(); } @@ -736,7 +737,7 @@ impl FinalizedState { /// referenced in a block, for testing purposes _only_. #[cfg(test)] pub fn populate_with_anchors(&self, block: &Block) { - let mut batch = rocksdb::WriteBatch::default(); + let mut batch = DiskWriteBatch::new(); let sprout_anchors = self.db.cf_handle("sprout_anchors").unwrap(); let sapling_anchors = self.db.cf_handle("sapling_anchors").unwrap(); diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index 776daa80b45..9ac4052bf8b 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -20,8 +20,7 @@ use crate::{ Config, }; -/// Wrapper struct to ensure low-level disk reads go through -/// the correct API. +/// Wrapper struct to ensure low-level database access goes through the correct API. pub struct DiskDb { /// The inner RocksDB database. db: rocksdb::DB, @@ -32,6 +31,12 @@ pub struct DiskDb { ephemeral: bool, } +/// Wrapper struct to ensure low-level database writes go through the correct API. +pub struct DiskWriteBatch { + /// The inner RocksDB write batch. + batch: rocksdb::WriteBatch, +} + /// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently /// defined format pub trait WriteDisk { @@ -48,7 +53,7 @@ pub trait WriteDisk { K: IntoDisk + Debug; } -impl WriteDisk for rocksdb::WriteBatch { +impl WriteDisk for DiskWriteBatch { fn zs_insert(&mut self, cf: &rocksdb::ColumnFamily, key: K, value: V) where K: IntoDisk + Debug, @@ -56,7 +61,7 @@ impl WriteDisk for rocksdb::WriteBatch { { let key_bytes = key.as_bytes(); let value_bytes = value.as_bytes(); - self.put_cf(cf, key_bytes, value_bytes); + self.batch.put_cf(cf, key_bytes, value_bytes); } fn zs_delete(&mut self, cf: &rocksdb::ColumnFamily, key: K) @@ -64,7 +69,7 @@ impl WriteDisk for rocksdb::WriteBatch { K: IntoDisk + Debug, { let key_bytes = key.as_bytes(); - self.delete_cf(cf, key_bytes); + self.batch.delete_cf(cf, key_bytes); } } @@ -117,6 +122,14 @@ impl ReadDisk for DiskDb { } } +impl DiskWriteBatch { + pub fn new() -> Self { + DiskWriteBatch { + batch: rocksdb::WriteBatch::default(), + } + } +} + impl DiskDb { /// The ideal open file limit for Zebra const IDEAL_OPEN_FILE_LIMIT: u64 = 1024; @@ -216,11 +229,9 @@ impl DiskDb { } /// Writes `batch` to the database. - /// - /// TODO: replace with type wrapper in next PR. - pub fn write(&self, batch: rocksdb::WriteBatch) -> Result<(), rocksdb::Error> { + pub fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> { // TODO: move writing to the database to a blocking thread (#2188) - self.db.write(batch) + self.db.write(batch.batch) } /// Returns the database options for the finalized state database.