From 32017f992b75c95ea19ab46485721c3d42457673 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 22 Feb 2022 22:59:44 +1000 Subject: [PATCH] 2. refactor(state): move all RocksDB API calls to the disk_db module (#3578) * refactor(state): move RocksDB-specific initialization to a new module * refactor(state): move RocksDB-specific shutdown to a new module * refactor(state): temporarily allow RocksDB-specific reads and writes, without a new module Unlike the last few commits, this one actually compiles. * refactor(state): add a DiskWriteBatch wrapper for RocksDB writes * refactor(state): move finalized state test methods to a test module --- zebra-state/src/config.rs | 136 +------ zebra-state/src/lib.rs | 3 + zebra-state/src/service/finalized_state.rs | 218 ++--------- .../src/service/finalized_state/arbitrary.rs | 57 ++- .../src/service/finalized_state/disk_db.rs | 348 +++++++++++++++++- .../service/finalized_state/disk_format.rs | 5 + 6 files changed, 433 insertions(+), 334 deletions(-) diff --git a/zebra-state/src/config.rs b/zebra-state/src/config.rs index ad9aaea9fbb..50bc3d94eef 100644 --- a/zebra-state/src/config.rs +++ b/zebra-state/src/config.rs @@ -1,8 +1,6 @@ -use std::{convert::TryInto, path::PathBuf}; +use std::path::PathBuf; -use rlimit::increase_nofile_limit; use serde::{Deserialize, Serialize}; -use tracing::{info, warn}; use zebra_chain::parameters::Network; @@ -57,34 +55,14 @@ fn gen_temp_path(prefix: &str) -> PathBuf { } impl Config { - /// The ideal open file limit for Zebra - const IDEAL_OPEN_FILE_LIMIT: u64 = 1024; - - /// The minimum number of open files for Zebra to operate normally. Also used - /// as the default open file limit, when the OS doesn't tell us how many - /// files we can use. - /// - /// We want 100+ file descriptors for peers, and 100+ for the database. - /// - /// On Windows, the default limit is 512 high-level I/O files, and 8192 - /// low-level I/O files: - /// https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks - const MIN_OPEN_FILE_LIMIT: u64 = 512; - - /// The number of files used internally by Zebra. - /// - /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+), - /// stdio (3), and other OS facilities (2+). - const RESERVED_FILE_COUNT: u64 = 48; - - /// Returns the path and database options for the finalized state database - pub(crate) fn db_config(&self, network: Network) -> (PathBuf, rocksdb::Options) { + /// Returns the path for the finalized state database + pub(crate) fn db_path(&self, network: Network) -> PathBuf { let net_dir = match network { Network::Mainnet => "mainnet", Network::Testnet => "testnet", }; - let path = if self.ephemeral { + if self.ephemeral { gen_temp_path(&format!( "zebra-state-v{}-{}", crate::constants::DATABASE_FORMAT_VERSION, @@ -95,25 +73,7 @@ impl Config { .join("state") .join(format!("v{}", crate::constants::DATABASE_FORMAT_VERSION)) .join(net_dir) - }; - - let mut opts = rocksdb::Options::default(); - - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - let open_file_limit = Config::increase_open_file_limit(); - let db_file_limit = Config::get_db_open_file_limit(open_file_limit); - - // If the current limit is very large, set the DB limit using the ideal limit - let ideal_limit = Config::get_db_open_file_limit(Config::IDEAL_OPEN_FILE_LIMIT) - .try_into() - .expect("ideal open file limit fits in a c_int"); - let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit); - - opts.set_max_open_files(db_file_limit); - - (path, opts) + } } /// Construct a config for an ephemeral database @@ -123,92 +83,6 @@ impl Config { ..Config::default() } } - - /// Calculate the database's share of `open_file_limit` - fn get_db_open_file_limit(open_file_limit: u64) -> u64 { - // Give the DB half the files, and reserve half the files for peers - (open_file_limit - Config::RESERVED_FILE_COUNT) / 2 - } - - /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`. - /// If that fails, try `MIN_OPEN_FILE_LIMIT`. - /// - /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it - /// unchanged. - /// - /// Returns the current limit, after any successful increases. - /// - /// # Panics - /// - /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`. - fn increase_open_file_limit() -> u64 { - // `increase_nofile_limit` doesn't do anything on Windows in rlimit 0.7.0. - // - // On Windows, the default limit is: - // - 512 high-level stream I/O files (via the C standard functions), and - // - 8192 low-level I/O files (via the Unix C functions). - // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks - // - // If we need more high-level I/O files on Windows, - // use `setmaxstdio` and `getmaxstdio` from the `rlimit` crate: - // https://docs.rs/rlimit/latest/rlimit/#windows - // - // Then panic if `setmaxstdio` fails to set the minimum value, - // and `getmaxstdio` is below the minimum value. - - // We try setting the ideal limit, then the minimum limit. - let current_limit = match increase_nofile_limit(Config::IDEAL_OPEN_FILE_LIMIT) { - Ok(current_limit) => current_limit, - Err(limit_error) => { - info!( - ?limit_error, - min_limit = ?Config::MIN_OPEN_FILE_LIMIT, - ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT, - "unable to increase the open file limit, \ - assuming Zebra can open a minimum number of files" - ); - - return Config::MIN_OPEN_FILE_LIMIT; - } - }; - - if current_limit < Config::MIN_OPEN_FILE_LIMIT { - panic!( - "open file limit too low: \ - unable to set the number of open files to {}, \ - the minimum number of files required by Zebra. \ - Current limit is {:?}. \ - Hint: Increase the open file limit to {} before launching Zebra", - Config::MIN_OPEN_FILE_LIMIT, - current_limit, - Config::IDEAL_OPEN_FILE_LIMIT - ); - } else if current_limit < Config::IDEAL_OPEN_FILE_LIMIT { - warn!( - ?current_limit, - min_limit = ?Config::MIN_OPEN_FILE_LIMIT, - ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT, - "the maximum number of open files is below Zebra's ideal limit. \ - Hint: Increase the open file limit to {} before launching Zebra", - Config::IDEAL_OPEN_FILE_LIMIT - ); - } else if cfg!(windows) { - info!( - min_limit = ?Config::MIN_OPEN_FILE_LIMIT, - ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT, - "assuming the open file limit is high enough for Zebra", - ); - } else { - info!( - ?current_limit, - min_limit = ?Config::MIN_OPEN_FILE_LIMIT, - ideal_limit = ?Config::IDEAL_OPEN_FILE_LIMIT, - "the open file limit is high enough for Zebra", - ); - } - - current_limit - } } impl Default for Config { diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index a8f80441a80..dd0b16852ba 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -12,6 +12,9 @@ #![doc(html_logo_url = "https://www.zfnd.org/images/zebra-icon.png")] #![doc(html_root_url = "https://doc.zebra.zfnd.org/zebra_state")] +#[macro_use] +extern crate tracing; + #[cfg(any(test, feature = "proptest-impl"))] mod arbitrary; mod config; diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 78dc9850bea..eeda8840300 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -1,4 +1,9 @@ //! The primary implementation of the `zebra_state::Service` built upon rocksdb +//! +//! # Correctness +//! +//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must +//! be incremented each time the database format (column, serialization, etc) changes. use std::{ borrow::Borrow, @@ -25,8 +30,8 @@ use crate::{ service::{ check, finalized_state::{ - disk_db::{ReadDisk, WriteDisk}, - disk_format::{FromDisk, IntoDisk, TransactionLocation}, + disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, + disk_format::{FromDisk, TransactionLocation}, }, QueuedFinalized, }, @@ -44,87 +49,44 @@ mod tests; /// The finalized part of the chain state, stored in the db. pub struct FinalizedState { + /// The underlying database. + db: DiskDb, + /// Queued blocks that arrived out of order, indexed by their parent block hash. queued_by_prev_hash: HashMap, + /// A metric tracking the maximum height that's currently in `queued_by_prev_hash` /// /// Set to `f64::NAN` if `queued_by_prev_hash` is empty, because grafana shows NaNs /// as a break in the graph. max_queued_height: f64, - db: rocksdb::DB, - ephemeral: bool, + /// The configured stop height. + /// /// Commit blocks to the finalized state up to this height, then exit Zebra. debug_stop_at_height: Option, + /// The configured network. network: Network, } impl FinalizedState { pub fn new(config: &Config, network: Network) -> Self { - let (path, db_options) = config.db_config(network); - // Note: The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must - // be incremented each time the database format (column, serialization, etc) changes. - let column_families = vec![ - rocksdb::ColumnFamilyDescriptor::new("hash_by_height", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("height_by_hash", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("block_by_height", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("tx_by_hash", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("utxo_by_outpoint", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("sprout_nullifiers", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("sapling_nullifiers", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("orchard_nullifiers", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("sprout_anchors", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("sapling_anchors", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("orchard_anchors", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("sprout_note_commitment_tree", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new( - "sapling_note_commitment_tree", - db_options.clone(), - ), - rocksdb::ColumnFamilyDescriptor::new( - "orchard_note_commitment_tree", - db_options.clone(), - ), - rocksdb::ColumnFamilyDescriptor::new("history_tree", db_options.clone()), - rocksdb::ColumnFamilyDescriptor::new("tip_chain_value_pool", db_options.clone()), - ]; - let db_result = rocksdb::DB::open_cf_descriptors(&db_options, &path, column_families); - - let db = match db_result { - Ok(d) => { - tracing::info!("Opened Zebra state cache at {}", path.display()); - d - } - // TODO: provide a different hint if the disk is full, see #1623 - Err(e) => panic!( - "Opening database {:?} failed: {:?}. \ - Hint: Check if another zebrad process is running. \ - Try changing the state cache_dir in the Zebra config.", - path, e, - ), - }; + let db = DiskDb::new(config, network); let new_state = Self { queued_by_prev_hash: HashMap::new(), max_queued_height: f64::NAN, db, - ephemeral: config.ephemeral, debug_stop_at_height: config.debug_stop_at_height.map(block::Height), network, }; - // TODO: remove these extra logs once bugs like #2905 are fixed - tracing::info!("reading cached tip height"); if let Some(tip_height) = new_state.finalized_tip_height() { - tracing::info!(?tip_height, "loaded cached tip height"); - if new_state.is_at_stop_height(tip_height) { let debug_stop_at_height = new_state .debug_stop_at_height .expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some"); - - tracing::info!("reading cached tip hash"); let tip_hash = new_state.finalized_tip_hash(); if tip_height > debug_stop_at_height { @@ -145,7 +107,6 @@ impl FinalizedState { // RocksDB can do a cleanup when column families are opened. // So we want to drop it before we exit. - tracing::info!("closing cached state"); std::mem::drop(new_state); Self::exit_process(); @@ -232,14 +193,6 @@ impl FinalizedState { self.tip().map(|(height, _)| height) } - fn is_empty(&self, cf: &rocksdb::ColumnFamily) -> bool { - // use iterator to check if it's empty - !self - .db - .iterator_cf(cf, rocksdb::IteratorMode::Start) - .valid() - } - /// Immediately commit `finalized` to the finalized state. /// /// This can be called either by the non-finalized state (when finalizing @@ -285,7 +238,7 @@ impl FinalizedState { let tip_chain_value_pool = self.db.cf_handle("tip_chain_value_pool").unwrap(); // Assert that callers (including unit tests) get the chain order correct - if self.is_empty(hash_by_height) { + if self.db.is_empty(hash_by_height) { assert_eq!( GENESIS_PREVIOUS_BLOCK_HASH, finalized.block.header.previous_block_hash, "the first block added to an empty state must be a genesis block, source: {}", @@ -346,8 +299,8 @@ impl FinalizedState { // the genesis case. // If the closure returns an error it will be propagated and the batch will not be written // to the BD afterwards. - let prepare_commit = || -> Result { - let mut batch = rocksdb::WriteBatch::default(); + let prepare_commit = || -> Result { + let mut batch = DiskWriteBatch::new(); // Index the block batch.zs_insert(hash_by_height, height, hash); @@ -413,7 +366,7 @@ impl FinalizedState { if let Some(utxo) = self.utxo(outpoint) { all_utxos_spent_by_block.insert(*outpoint, utxo); } - batch.delete_cf(utxo_by_outpoint, outpoint.as_bytes()); + batch.zs_delete(utxo_by_outpoint, outpoint); } // Coinbase inputs represent new coins, // so there are no UTXOs to mark as spent. @@ -505,6 +458,7 @@ impl FinalizedState { tracing::trace!(?source, "committed block from"); + // TODO: move the stop height check to the syncer (#3442) if result.is_ok() && self.is_at_stop_height(height) { tracing::info!(?source, "committed block from"); tracing::info!( @@ -513,9 +467,8 @@ impl FinalizedState { "stopping at configured height, flushing database to disk" ); - self.shutdown(); + self.db.shutdown(); - // TODO: replace with a graceful shutdown (#1678) Self::exit_process(); } @@ -525,7 +478,8 @@ impl FinalizedState { /// Exit the host process. /// /// Designed for debugging and tests. - /// TODO: replace with a graceful shutdown (#1678) + /// + /// TODO: move the stop height check to the syncer (#3442) fn exit_process() -> ! { tracing::info!("exiting Zebra"); @@ -582,7 +536,7 @@ impl FinalizedState { pub fn tip(&self) -> Option<(block::Height, block::Hash)> { let hash_by_height = self.db.cf_handle("hash_by_height").unwrap(); self.db - .iterator_cf(hash_by_height, rocksdb::IteratorMode::End) + .reverse_iterator(hash_by_height) .next() .map(|(height_bytes, hash_bytes)| { let height = block::Height::from_bytes(height_bytes); @@ -754,32 +708,6 @@ impl FinalizedState { } } - /// If the database is `ephemeral`, delete it. - fn delete_ephemeral(&self) { - if self.ephemeral { - let path = self.db.path(); - tracing::info!(cache_path = ?path, "removing temporary database files"); - - // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases, - // but the Zcash blockchain might not fit in memory. So we just - // delete the database files instead. - // - // We'd like to call `DB::destroy` here, but calling destroy on a - // live DB is undefined behaviour: - // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite - // - // So we assume that all the database files are under `path`, and - // delete them using standard filesystem APIs. Deleting open files - // might cause errors on non-Unix platforms, so we ignore the result. - // (The OS will delete them eventually anyway.) - let res = std::fs::remove_dir_all(path); - - // TODO: downgrade to debug once bugs like #2905 are fixed - // but leave any errors at "info" level - tracing::info!(?res, "removed temporary database files"); - } - } - /// Returns the `Path` where the files used by this database are located. #[allow(dead_code)] pub fn path(&self) -> &Path { @@ -793,104 +721,6 @@ impl FinalizedState { .zs_get(value_pool_cf, &()) .unwrap_or_else(ValueBalance::zero) } - - /// Allow to set up a fake value pool in the database for testing purposes. - #[cfg(any(test, feature = "proptest-impl"))] - #[allow(dead_code)] - pub fn set_current_value_pool(&self, fake_value_pool: ValueBalance) { - let mut batch = rocksdb::WriteBatch::default(); - let value_pool_cf = self.db.cf_handle("tip_chain_value_pool").unwrap(); - batch.zs_insert(value_pool_cf, (), fake_value_pool); - self.db.write(batch).unwrap(); - } - - /// Artificially prime the note commitment tree anchor sets with anchors - /// referenced in a block, for testing purposes _only_. - #[cfg(test)] - pub fn populate_with_anchors(&self, block: &Block) { - let mut batch = rocksdb::WriteBatch::default(); - - let sprout_anchors = self.db.cf_handle("sprout_anchors").unwrap(); - let sapling_anchors = self.db.cf_handle("sapling_anchors").unwrap(); - let orchard_anchors = self.db.cf_handle("orchard_anchors").unwrap(); - - for transaction in block.transactions.iter() { - // Sprout - for joinsplit in transaction.sprout_groth16_joinsplits() { - batch.zs_insert( - sprout_anchors, - joinsplit.anchor, - sprout::tree::NoteCommitmentTree::default(), - ); - } - - // Sapling - for anchor in transaction.sapling_anchors() { - batch.zs_insert(sapling_anchors, anchor, ()); - } - - // Orchard - if let Some(orchard_shielded_data) = transaction.orchard_shielded_data() { - batch.zs_insert(orchard_anchors, orchard_shielded_data.shared_anchor, ()); - } - } - - self.db.write(batch).unwrap(); - } - - /// Shut down the database, cleaning up background tasks and ephemeral data. - fn shutdown(&mut self) { - // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out. - // - // Zebra's data should be fine if we don't clean up, because: - // - the database flushes regularly anyway - // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back - // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually - tracing::info!("flushing database to disk"); - self.db.flush().expect("flush is successful"); - - // But we should call `cancel_all_background_work` before Zebra exits. - // If we don't, we see these kinds of errors: - // ``` - // pthread lock: Invalid argument - // pure virtual method called - // terminate called without an active exception - // pthread destroy mutex: Device or resource busy - // Aborted (core dumped) - // ``` - // - // The RocksDB wiki says: - // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests? - // > - // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB. - // > You can speed up the waiting by calling CancelAllBackgroundWork(). - // - // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ - tracing::info!("stopping background database tasks"); - self.db.cancel_all_background_work(true); - - // We'd like to drop the database before deleting its files, - // because that closes the column families and the database correctly. - // But Rust's ownership rules make that difficult, - // so we just flush and delete ephemeral data instead. - // - // The RocksDB wiki says: - // > rocksdb::DB instances need to be destroyed before your main function exits. - // > RocksDB instances usually depend on some internal static variables. - // > Users need to make sure rocksdb::DB instances are destroyed before those static variables. - // - // https://github.com/facebook/rocksdb/wiki/Known-Issues - // - // But our current code doesn't seem to cause any issues. - // We might want to explicitly drop the database as part of graceful shutdown (#1678). - self.delete_ephemeral(); - } -} - -impl Drop for FinalizedState { - fn drop(&mut self) { - self.shutdown(); - } } fn block_precommit_metrics(block: &Block, hash: block::Hash, height: block::Height) { diff --git a/zebra-state/src/service/finalized_state/arbitrary.rs b/zebra-state/src/service/finalized_state/arbitrary.rs index 054d5063b5e..4adcf0395a8 100644 --- a/zebra-state/src/service/finalized_state/arbitrary.rs +++ b/zebra-state/src/service/finalized_state/arbitrary.rs @@ -6,9 +6,18 @@ use std::sync::Arc; use proptest::prelude::*; -use zebra_chain::block; +use zebra_chain::{ + amount::NonNegative, + block::{self, Block}, + sprout, + value_balance::ValueBalance, +}; -use crate::service::finalized_state::disk_format::{FromDisk, IntoDisk, TransactionLocation}; +use crate::service::finalized_state::{ + disk_db::{DiskWriteBatch, WriteDisk}, + disk_format::{FromDisk, IntoDisk, TransactionLocation}, + FinalizedState, +}; impl Arbitrary for TransactionLocation { type Parameters = (); @@ -84,3 +93,47 @@ where assert_round_trip_arc(Arc::new(input.clone())); assert_round_trip(input); } + +impl FinalizedState { + /// Allow to set up a fake value pool in the database for testing purposes. + pub fn set_current_value_pool(&self, fake_value_pool: ValueBalance) { + let mut batch = DiskWriteBatch::new(); + let value_pool_cf = self.db.cf_handle("tip_chain_value_pool").unwrap(); + + batch.zs_insert(value_pool_cf, (), fake_value_pool); + self.db.write(batch).unwrap(); + } + + /// Artificially prime the note commitment tree anchor sets with anchors + /// referenced in a block, for testing purposes _only_. + pub fn populate_with_anchors(&self, block: &Block) { + let mut batch = DiskWriteBatch::new(); + + let sprout_anchors = self.db.cf_handle("sprout_anchors").unwrap(); + let sapling_anchors = self.db.cf_handle("sapling_anchors").unwrap(); + let orchard_anchors = self.db.cf_handle("orchard_anchors").unwrap(); + + for transaction in block.transactions.iter() { + // Sprout + for joinsplit in transaction.sprout_groth16_joinsplits() { + batch.zs_insert( + sprout_anchors, + joinsplit.anchor, + sprout::tree::NoteCommitmentTree::default(), + ); + } + + // Sapling + for anchor in transaction.sapling_anchors() { + batch.zs_insert(sapling_anchors, anchor, ()); + } + + // Orchard + if let Some(orchard_shielded_data) = transaction.orchard_shielded_data() { + batch.zs_insert(orchard_anchors, orchard_shielded_data.shared_anchor, ()); + } + } + + self.db.write(batch).unwrap(); + } +} diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index 6b92a9d9cb0..9ac4052bf8b 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -3,10 +3,39 @@ //! This module makes sure that: //! - all disk writes happen inside a RocksDB transaction, and //! - format-specific invariants are maintained. +//! +//! # Correctness +//! +//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must +//! be incremented each time the database format (column, serialization, etc) changes. + +use std::{fmt::Debug, path::Path}; + +use rlimit::increase_nofile_limit; -use std::fmt::Debug; +use zebra_chain::parameters::Network; + +use crate::{ + service::finalized_state::disk_format::{FromDisk, IntoDisk}, + Config, +}; + +/// Wrapper struct to ensure low-level database access goes through the correct API. +pub struct DiskDb { + /// The inner RocksDB database. + db: rocksdb::DB, + + /// The configured temporary database setting. + /// + /// If true, the database files are deleted on drop. + ephemeral: bool, +} -use crate::service::finalized_state::disk_format::{FromDisk, IntoDisk}; +/// Wrapper struct to ensure low-level database writes go through the correct API. +pub struct DiskWriteBatch { + /// The inner RocksDB write batch. + batch: rocksdb::WriteBatch, +} /// Helper trait for inserting (Key, Value) pairs into rocksdb with a consistently /// defined format @@ -24,7 +53,7 @@ pub trait WriteDisk { K: IntoDisk + Debug; } -impl WriteDisk for rocksdb::WriteBatch { +impl WriteDisk for DiskWriteBatch { fn zs_insert(&mut self, cf: &rocksdb::ColumnFamily, key: K, value: V) where K: IntoDisk + Debug, @@ -32,7 +61,7 @@ impl WriteDisk for rocksdb::WriteBatch { { let key_bytes = key.as_bytes(); let value_bytes = value.as_bytes(); - self.put_cf(cf, key_bytes, value_bytes); + self.batch.put_cf(cf, key_bytes, value_bytes); } fn zs_delete(&mut self, cf: &rocksdb::ColumnFamily, key: K) @@ -40,7 +69,7 @@ impl WriteDisk for rocksdb::WriteBatch { K: IntoDisk + Debug, { let key_bytes = key.as_bytes(); - self.delete_cf(cf, key_bytes); + self.batch.delete_cf(cf, key_bytes); } } @@ -59,7 +88,7 @@ pub trait ReadDisk { K: IntoDisk; } -impl ReadDisk for rocksdb::DB { +impl ReadDisk for DiskDb { fn zs_get(&self, cf: &rocksdb::ColumnFamily, key: &K) -> Option where K: IntoDisk, @@ -71,6 +100,7 @@ impl ReadDisk for rocksdb::DB { // value, because we're going to deserialize it anyways, which avoids an // extra copy let value_bytes = self + .db .get_pinned_cf(cf, key_bytes) .expect("expected that disk errors would not occur"); @@ -85,8 +115,312 @@ impl ReadDisk for rocksdb::DB { // We use `get_pinned_cf` to avoid taking ownership of the serialized // value, because we don't use the value at all. This avoids an extra copy. - self.get_pinned_cf(cf, key_bytes) + self.db + .get_pinned_cf(cf, key_bytes) .expect("expected that disk errors would not occur") .is_some() } } + +impl DiskWriteBatch { + pub fn new() -> Self { + DiskWriteBatch { + batch: rocksdb::WriteBatch::default(), + } + } +} + +impl DiskDb { + /// The ideal open file limit for Zebra + const IDEAL_OPEN_FILE_LIMIT: u64 = 1024; + + /// The minimum number of open files for Zebra to operate normally. Also used + /// as the default open file limit, when the OS doesn't tell us how many + /// files we can use. + /// + /// We want 100+ file descriptors for peers, and 100+ for the database. + /// + /// On Windows, the default limit is 512 high-level I/O files, and 8192 + /// low-level I/O files: + /// https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks + const MIN_OPEN_FILE_LIMIT: u64 = 512; + + /// The number of files used internally by Zebra. + /// + /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+), + /// stdio (3), and other OS facilities (2+). + const RESERVED_FILE_COUNT: u64 = 48; + + pub fn new(config: &Config, network: Network) -> DiskDb { + let path = config.db_path(network); + let db_options = DiskDb::options(); + + let column_families = vec![ + rocksdb::ColumnFamilyDescriptor::new("hash_by_height", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("height_by_hash", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("block_by_height", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("tx_by_hash", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("utxo_by_outpoint", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("sprout_nullifiers", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("sapling_nullifiers", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("orchard_nullifiers", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("sprout_anchors", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("sapling_anchors", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("orchard_anchors", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("sprout_note_commitment_tree", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new( + "sapling_note_commitment_tree", + db_options.clone(), + ), + rocksdb::ColumnFamilyDescriptor::new( + "orchard_note_commitment_tree", + db_options.clone(), + ), + rocksdb::ColumnFamilyDescriptor::new("history_tree", db_options.clone()), + rocksdb::ColumnFamilyDescriptor::new("tip_chain_value_pool", db_options.clone()), + ]; + + // TODO: move opening the database to a blocking thread (#2188) + let db_result = rocksdb::DB::open_cf_descriptors(&db_options, &path, column_families); + + match db_result { + Ok(db) => { + info!("Opened Zebra state cache at {}", path.display()); + + DiskDb { + db, + ephemeral: config.ephemeral, + } + } + // TODO: provide a different hint if the disk is full, see #1623 + Err(e) => panic!( + "Opening database {:?} failed: {:?}. \ + Hint: Check if another zebrad process is running. \ + Try changing the state cache_dir in the Zebra config.", + path, e, + ), + } + } + + /// Returns the `Path` where the files used by this database are located. + pub fn path(&self) -> &Path { + self.db.path() + } + + /// Returns the column family handle for `cf_name`. + pub fn cf_handle(&self, cf_name: &str) -> Option<&rocksdb::ColumnFamily> { + self.db.cf_handle(cf_name) + } + + /// Returns an iterator over the keys in `cf_name`, starting from the first key. + pub fn forward_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator { + self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::Start) + } + + /// Returns a reverse iterator over the keys in `cf_name`, starting from the last key. + pub fn reverse_iterator(&self, cf_handle: &rocksdb::ColumnFamily) -> rocksdb::DBIterator { + self.db.iterator_cf(cf_handle, rocksdb::IteratorMode::End) + } + + /// Returns true if `cf` does not contain any entries. + pub fn is_empty(&self, cf_handle: &rocksdb::ColumnFamily) -> bool { + // Empty column families return invalid iterators. + !self.forward_iterator(cf_handle).valid() + } + + /// Writes `batch` to the database. + pub fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> { + // TODO: move writing to the database to a blocking thread (#2188) + self.db.write(batch.batch) + } + + /// Returns the database options for the finalized state database. + fn options() -> rocksdb::Options { + let mut opts = rocksdb::Options::default(); + + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let open_file_limit = DiskDb::increase_open_file_limit(); + let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit); + + // If the current limit is very large, set the DB limit using the ideal limit + let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) + .try_into() + .expect("ideal open file limit fits in a c_int"); + let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit); + + opts.set_max_open_files(db_file_limit); + + opts + } + + /// Calculate the database's share of `open_file_limit` + fn get_db_open_file_limit(open_file_limit: u64) -> u64 { + // Give the DB half the files, and reserve half the files for peers + (open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2 + } + + /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`. + /// If that fails, try `MIN_OPEN_FILE_LIMIT`. + /// + /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it + /// unchanged. + /// + /// Returns the current limit, after any successful increases. + /// + /// # Panics + /// + /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`. + fn increase_open_file_limit() -> u64 { + // `increase_nofile_limit` doesn't do anything on Windows in rlimit 0.7.0. + // + // On Windows, the default limit is: + // - 512 high-level stream I/O files (via the C standard functions), and + // - 8192 low-level I/O files (via the Unix C functions). + // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks + // + // If we need more high-level I/O files on Windows, + // use `setmaxstdio` and `getmaxstdio` from the `rlimit` crate: + // https://docs.rs/rlimit/latest/rlimit/#windows + // + // Then panic if `setmaxstdio` fails to set the minimum value, + // and `getmaxstdio` is below the minimum value. + + // We try setting the ideal limit, then the minimum limit. + let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) { + Ok(current_limit) => current_limit, + Err(limit_error) => { + info!( + ?limit_error, + min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT, + ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT, + "unable to increase the open file limit, \ + assuming Zebra can open a minimum number of files" + ); + + return DiskDb::MIN_OPEN_FILE_LIMIT; + } + }; + + if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT { + panic!( + "open file limit too low: \ + unable to set the number of open files to {}, \ + the minimum number of files required by Zebra. \ + Current limit is {:?}. \ + Hint: Increase the open file limit to {} before launching Zebra", + DiskDb::MIN_OPEN_FILE_LIMIT, + current_limit, + DiskDb::IDEAL_OPEN_FILE_LIMIT + ); + } else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT { + warn!( + ?current_limit, + min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT, + ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT, + "the maximum number of open files is below Zebra's ideal limit. \ + Hint: Increase the open file limit to {} before launching Zebra", + DiskDb::IDEAL_OPEN_FILE_LIMIT + ); + } else if cfg!(windows) { + info!( + min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT, + ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT, + "assuming the open file limit is high enough for Zebra", + ); + } else { + info!( + ?current_limit, + min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT, + ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT, + "the open file limit is high enough for Zebra", + ); + } + + current_limit + } + + /// Shut down the database, cleaning up background tasks and ephemeral data. + /// + /// TODO: make private after the stop height check has moved to the syncer (#3442) + /// move shutting down the database to a blocking thread (#2188) + pub(crate) fn shutdown(&mut self) { + // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out. + // + // Zebra's data should be fine if we don't clean up, because: + // - the database flushes regularly anyway + // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back + // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually + info!("flushing database to disk"); + self.db.flush().expect("flush is successful"); + + // But we should call `cancel_all_background_work` before Zebra exits. + // If we don't, we see these kinds of errors: + // ``` + // pthread lock: Invalid argument + // pure virtual method called + // terminate called without an active exception + // pthread destroy mutex: Device or resource busy + // Aborted (core dumped) + // ``` + // + // The RocksDB wiki says: + // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests? + // > + // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB. + // > You can speed up the waiting by calling CancelAllBackgroundWork(). + // + // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ + info!("stopping background database tasks"); + self.db.cancel_all_background_work(true); + + // We'd like to drop the database before deleting its files, + // because that closes the column families and the database correctly. + // But Rust's ownership rules make that difficult, + // so we just flush and delete ephemeral data instead. + // + // The RocksDB wiki says: + // > rocksdb::DB instances need to be destroyed before your main function exits. + // > RocksDB instances usually depend on some internal static variables. + // > Users need to make sure rocksdb::DB instances are destroyed before those static variables. + // + // https://github.com/facebook/rocksdb/wiki/Known-Issues + // + // But our current code doesn't seem to cause any issues. + // We might want to explicitly drop the database as part of graceful shutdown (#1678). + self.delete_ephemeral(); + } + + /// If the database is `ephemeral`, delete it. + fn delete_ephemeral(&self) { + if self.ephemeral { + let path = self.path(); + info!(cache_path = ?path, "removing temporary database files"); + + // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases, + // but the Zcash blockchain might not fit in memory. So we just + // delete the database files instead. + // + // We'd like to call `DB::destroy` here, but calling destroy on a + // live DB is undefined behaviour: + // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite + // + // So we assume that all the database files are under `path`, and + // delete them using standard filesystem APIs. Deleting open files + // might cause errors on non-Unix platforms, so we ignore the result. + // (The OS will delete them eventually anyway.) + let res = std::fs::remove_dir_all(path); + + // TODO: downgrade to debug once bugs like #2905 are fixed + // but leave any errors at "info" level + info!(?res, "removed temporary database files"); + } + } +} + +impl Drop for DiskDb { + fn drop(&mut self) { + self.shutdown(); + } +} diff --git a/zebra-state/src/service/finalized_state/disk_format.rs b/zebra-state/src/service/finalized_state/disk_format.rs index 4ae7fd624fd..654fc3ba697 100644 --- a/zebra-state/src/service/finalized_state/disk_format.rs +++ b/zebra-state/src/service/finalized_state/disk_format.rs @@ -1,4 +1,9 @@ //! Module defining the serialization format for finalized data. +//! +//! # Correctness +//! +//! The [`crate::constants::DATABASE_FORMAT_VERSION`] constant must +//! be incremented each time the database format (column, serialization, etc) changes. use std::{collections::BTreeMap, convert::TryInto, fmt::Debug, sync::Arc};