diff --git a/crates/node-core/src/metrics/prometheus_exporter.rs b/crates/node-core/src/metrics/prometheus_exporter.rs index 952c02fd8137..4a7bbccd65f3 100644 --- a/crates/node-core/src/metrics/prometheus_exporter.rs +++ b/crates/node-core/src/metrics/prometheus_exporter.rs @@ -110,11 +110,6 @@ where describe_gauge!("db.table_pages", "The number of database pages for a table"); describe_gauge!("db.table_entries", "The number of entries for a table"); describe_gauge!("db.freelist", "The number of pages on the freelist"); - describe_gauge!( - "db.timed_out_not_aborted_transactions", - "Number of timed out transactions that were not aborted by the user yet" - ); - describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment"); describe_gauge!("static_files.segment_files", "The number of files for a static file segment"); describe_gauge!( diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 1cc6f85436e7..5a3e01a3e55b 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -123,19 +123,17 @@ impl Database for DatabaseEnv { type TXMut = tx::Tx; fn tx(&self) -> Result { - Tx::new_with_metrics( + Ok(Tx::new_with_metrics( self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, self.metrics.as_ref().cloned(), - ) - .map_err(|e| DatabaseError::InitTx(e.into())) + )) } fn tx_mut(&self) -> Result { - Tx::new_with_metrics( + Ok(Tx::new_with_metrics( self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, self.metrics.as_ref().cloned(), - ) - .map_err(|e| DatabaseError::InitTx(e.into())) + )) } } @@ -204,12 +202,6 @@ impl DatabaseMetrics for DatabaseEnv { metrics.push(("db.freelist", freelist as f64, vec![])); } - metrics.push(( - "db.timed_out_not_aborted_transactions", - self.timed_out_not_aborted_transactions() as f64, - vec![], - )); - metrics } } diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 798b0b3c0208..16d38ead6758 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -53,16 +53,14 @@ impl Tx { pub fn new_with_metrics( inner: Transaction, env_metrics: Option>, - ) -> reth_libmdbx::Result { - let metrics_handler = env_metrics - .map(|env_metrics| { - let handler = MetricsHandler::::new(inner.id()?, env_metrics); - handler.env_metrics.record_opened_transaction(handler.transaction_mode()); - handler.log_transaction_opened(); - Ok(handler) - }) - .transpose()?; - Ok(Self::new_inner(inner, metrics_handler)) + ) -> Self { + let metrics_handler = env_metrics.map(|env_metrics| { + let handler = MetricsHandler::::new(inner.id(), env_metrics); + handler.env_metrics.record_opened_transaction(handler.transaction_mode()); + handler.log_transaction_opened(); + handler + }); + Self::new_inner(inner, metrics_handler) } #[inline] @@ -78,8 +76,8 @@ impl Tx { } /// Gets this transaction ID. - pub fn id(&self) -> reth_libmdbx::Result { - self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id)) + pub fn id(&self) -> u64 { + self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id) } /// Gets a table database handle if it exists, otherwise creates it. @@ -439,7 +437,7 @@ mod tests { assert_eq!( tx.get::(0).err(), - Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into())) + Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.into())) ); // Transaction is timeout-ed assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed)); // Backtrace is recorded diff --git a/crates/storage/libmdbx-rs/benches/cursor.rs b/crates/storage/libmdbx-rs/benches/cursor.rs index a73b4fe275e8..c05cef4b6ef6 100644 --- a/crates/storage/libmdbx-rs/benches/cursor.rs +++ b/crates/storage/libmdbx-rs/benches/cursor.rs @@ -78,7 +78,8 @@ fn bench_get_seq_raw(c: &mut Criterion) { let (_dir, env) = setup_bench_db(n); let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi(); - let txn = env.begin_ro_txn().unwrap(); + let _txn = env.begin_ro_txn().unwrap(); + let txn = _txn.txn(); let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; @@ -86,21 +87,18 @@ fn bench_get_seq_raw(c: &mut Criterion) { c.bench_function("bench_get_seq_raw", |b| { b.iter(|| unsafe { - txn.txn_execute(|txn| { - mdbx_cursor_open(txn, dbi, &mut cursor); - let mut i = 0; - let mut count = 0u32; + mdbx_cursor_open(txn, dbi, &mut cursor); + let mut i = 0; + let mut count = 0u32; - while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 { - i += key.iov_len + data.iov_len; - count += 1; - } + while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 { + i += key.iov_len + data.iov_len; + count += 1; + } - black_box(i); - assert_eq!(count, n); - mdbx_cursor_close(cursor); - }) - .unwrap(); + black_box(i); + assert_eq!(count, n); + mdbx_cursor_close(cursor); }) }); } diff --git a/crates/storage/libmdbx-rs/benches/transaction.rs b/crates/storage/libmdbx-rs/benches/transaction.rs index dc8426c5bf31..91e2c44044c7 100644 --- a/crates/storage/libmdbx-rs/benches/transaction.rs +++ b/crates/storage/libmdbx-rs/benches/transaction.rs @@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) { c.bench_function("bench_get_rand_raw", |b| { b.iter(|| unsafe { - txn.txn_execute(|txn| { + txn.with_raw_tx_ptr(|txn| { let mut i: size_t = 0; for key in &keys { key_val.iov_len = key.len() as size_t; @@ -57,8 +57,7 @@ fn bench_get_rand_raw(c: &mut Criterion) { i += key_val.iov_len; } black_box(i); - }) - .unwrap(); + }); }) }); } diff --git a/crates/storage/libmdbx-rs/src/cursor.rs b/crates/storage/libmdbx-rs/src/cursor.rs index d9b1c5c42f01..1f356a7e5579 100644 --- a/crates/storage/libmdbx-rs/src/cursor.rs +++ b/crates/storage/libmdbx-rs/src/cursor.rs @@ -1,5 +1,5 @@ use crate::{ - error::{mdbx_result, Error, Result}, + error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result}, flags::*, mdbx_try_optional, transaction::{TransactionKind, RW}, @@ -30,26 +30,26 @@ where pub(crate) fn new(txn: Transaction, dbi: ffi::MDBX_dbi) -> Result { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); unsafe { - txn.txn_execute(|txn_ptr| { - mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor)) - })??; + mdbx_result_with_tx_kind::( + txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)), + txn.txn(), + txn.env().txn_manager(), + )?; } Ok(Self { txn, cursor }) } fn new_at_position(other: &Self) -> Result { unsafe { - other.txn.txn_execute(|_| { - let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); + let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); - let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); + let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); - let s = Self { txn: other.txn.clone(), cursor }; + let s = Self { txn: other.txn.clone(), cursor }; - mdbx_result(res)?; + mdbx_result_with_tx_kind::(res, s.txn.txn(), s.txn.env().txn_manager())?; - Ok(s) - })? + Ok(s) } } @@ -95,12 +95,11 @@ where let key_ptr = key_val.iov_base; let data_ptr = data_val.iov_base; self.txn.txn_execute(|txn| { - let v = mdbx_result(ffi::mdbx_cursor_get( - self.cursor, - &mut key_val, - &mut data_val, - op, - ))?; + let v = mdbx_result_with_tx_kind::( + ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op), + txn, + self.txn.env().txn_manager(), + )?; assert_ne!(data_ptr, data_val.iov_base); let key_out = { // MDBX wrote in new key @@ -112,7 +111,7 @@ where }; let data_out = Value::decode_val::(txn, data_val)?; Ok((key_out, data_out, v)) - })? + }) } } @@ -445,7 +444,7 @@ impl Cursor { mdbx_result(unsafe { self.txn.txn_execute(|_| { ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits()) - })? + }) })?; Ok(()) @@ -459,7 +458,7 @@ impl Cursor { /// current key, if the database was opened with [DatabaseFlags::DUP_SORT]. pub fn del(&mut self, flags: WriteFlags) -> Result<()> { mdbx_result(unsafe { - self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))? + self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits())) })?; Ok(()) @@ -471,7 +470,7 @@ where K: TransactionKind, { fn clone(&self) -> Self { - Self::new_at_position(self).unwrap() + self.txn.txn_execute(|_| Self::new_at_position(self).unwrap()) } } @@ -489,7 +488,7 @@ where K: TransactionKind, { fn drop(&mut self) { - let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }); + self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }) } } @@ -565,7 +564,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - let result = cursor.txn.txn_execute(|txn| { + cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, key) { @@ -584,11 +583,7 @@ where ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None, error => Some(Err(Error::from_err_code(error))), } - }); - match result { - Ok(result) => result, - Err(err) => Some(Err(err)), - } + }) } } Self::Err(err) => err.take().map(Err), @@ -660,7 +655,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - let result = cursor.txn.txn_execute(|txn| { + cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, key) { @@ -679,11 +674,7 @@ where ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None, error => Some(Err(Error::from_err_code(error))), } - }); - match result { - Ok(result) => result, - Err(err) => Some(Err(err)), - } + }) } } Iter::Err(err) => err.take().map(Err), @@ -761,15 +752,17 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, ffi::MDBX_NEXT_NODUP); - let err_code = - unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) }; - - (err_code == ffi::MDBX_SUCCESS).then(|| { - IntoIter::new( - Cursor::new_at_position(&**cursor).unwrap(), - ffi::MDBX_GET_CURRENT, - ffi::MDBX_NEXT_DUP, - ) + cursor.txn.txn_execute(|_| { + let err_code = + unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) }; + + (err_code == ffi::MDBX_SUCCESS).then(|| { + IntoIter::new( + Cursor::new_at_position(&**cursor).unwrap(), + ffi::MDBX_GET_CURRENT, + ffi::MDBX_NEXT_DUP, + ) + }) }) } IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))), diff --git a/crates/storage/libmdbx-rs/src/database.rs b/crates/storage/libmdbx-rs/src/database.rs index 55eb7e0bbf57..09e2e10890bf 100644 --- a/crates/storage/libmdbx-rs/src/database.rs +++ b/crates/storage/libmdbx-rs/src/database.rs @@ -1,5 +1,5 @@ use crate::{ - error::{mdbx_result, Result}, + error::{mdbx_result_with_tx_kind, Result}, transaction::TransactionKind, Environment, Transaction, }; @@ -31,8 +31,12 @@ impl Database { let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() }; let mut dbi: ffi::MDBX_dbi = 0; txn.txn_execute(|txn_ptr| { - mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) }) - })??; + mdbx_result_with_tx_kind::( + unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) }, + txn_ptr, + txn.env().txn_manager(), + ) + })?; Ok(Self::new_from_ptr(dbi, txn.env().clone())) } diff --git a/crates/storage/libmdbx-rs/src/environment.rs b/crates/storage/libmdbx-rs/src/environment.rs index c4ca891132a8..91bf80edbf3e 100644 --- a/crates/storage/libmdbx-rs/src/environment.rs +++ b/crates/storage/libmdbx-rs/src/environment.rs @@ -88,12 +88,6 @@ impl Environment { &self.inner.txn_manager } - /// Returns the number of timed out transactions that were not aborted by the user yet. - #[cfg(feature = "read-tx-timeouts")] - pub fn timed_out_not_aborted_transactions(&self) -> usize { - self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0) - } - /// Create a read-only transaction for use with the environment. #[inline] pub fn begin_ro_txn(&self) -> Result> { diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index 84a6ef361747..22e440426247 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -1,3 +1,4 @@ +use crate::{txn_manager::TxnManager, TransactionKind}; use libc::c_int; use std::result; @@ -117,9 +118,8 @@ pub enum Error { /// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened. #[error("write transactions are not supported in read-only mode")] WriteTransactionUnsupportedInReadOnlyMode, - /// Read transaction has been timed out. - #[error("read transaction has been timed out")] - ReadTransactionTimeout, + #[error("read transaction has been aborted by the transaction manager")] + ReadTransactionAborted, /// Unknown error code. #[error("unknown error code")] Other(i32), @@ -193,10 +193,9 @@ impl Error { Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL, Error::Access => ffi::MDBX_EACCESS, Error::TooLarge => ffi::MDBX_TOO_LARGE, - Error::BadSignature => ffi::MDBX_EBADSIGN, + Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN, Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS, Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS, - Error::ReadTransactionTimeout => -96000, // Custom non-MDBX error code Error::Other(err_code) => *err_code, } } @@ -217,6 +216,33 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result { } } +#[cfg(feature = "read-tx-timeouts")] +#[inline] +pub(crate) fn mdbx_result_with_tx_kind( + err_code: c_int, + txn: *mut ffi::MDBX_txn, + txn_manager: &TxnManager, +) -> Result { + if K::IS_READ_ONLY && + txn_manager.remove_aborted_read_transaction(txn).is_some() && + err_code == ffi::MDBX_EBADSIGN + { + return Err(Error::ReadTransactionAborted) + } + + mdbx_result(err_code) +} + +#[cfg(not(feature = "read-tx-timeouts"))] +#[inline] +pub(crate) fn mdbx_result_with_tx_kind( + err_code: c_int, + _txn: *mut ffi::MDBX_txn, + _txn_manager: &TxnManager, +) -> Result { + mdbx_result(err_code) +} + #[macro_export] macro_rules! mdbx_try_optional { ($expr:expr) => {{ diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index a819f8b9ffe2..9910d9dca63a 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -1,12 +1,12 @@ use crate::{ database::Database, environment::Environment, - error::{mdbx_result, Result}, + error::{mdbx_result, mdbx_result_with_tx_kind, Result}, flags::{DatabaseFlags, WriteFlags}, txn_manager::{TxnManagerMessage, TxnPtr}, Cursor, Error, Stat, TableObject, }; -use ffi::{mdbx_txn_renew, MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE}; +use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE}; use indexmap::IndexSet; use libc::{c_uint, c_void}; use parking_lot::Mutex; @@ -71,27 +71,29 @@ where pub(crate) fn new(env: Environment) -> Result { let mut txn: *mut ffi::MDBX_txn = ptr::null_mut(); unsafe { - mdbx_result(ffi::mdbx_txn_begin_ex( - env.env_ptr(), - ptr::null_mut(), - K::OPEN_FLAGS, - &mut txn, - ptr::null_mut(), - ))?; + mdbx_result_with_tx_kind::( + ffi::mdbx_txn_begin_ex( + env.env_ptr(), + ptr::null_mut(), + K::OPEN_FLAGS, + &mut txn, + ptr::null_mut(), + ), + txn, + env.txn_manager(), + )?; Ok(Self::new_from_ptr(env, txn)) } } - pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self { - let txn = TransactionPtr::new(txn_ptr); - + pub(crate) fn new_from_ptr(env: Environment, txn: *mut ffi::MDBX_txn) -> Self { #[cfg(feature = "read-tx-timeouts")] if K::IS_READ_ONLY { - env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone()) + env.txn_manager().add_active_read_transaction(txn) } let inner = TransactionInner { - txn, + txn: TransactionPtr::new(txn), primed_dbis: Mutex::new(IndexSet::new()), committed: AtomicBool::new(false), env, @@ -106,7 +108,7 @@ where /// The caller **must** ensure that the pointer is not used after the /// lifetime of the transaction. #[inline] - pub fn txn_execute(&self, f: F) -> Result + pub(crate) fn txn_execute(&self, f: F) -> T where F: FnOnce(*mut ffi::MDBX_txn) -> T, { @@ -115,18 +117,32 @@ where /// Returns a copy of the raw pointer to the underlying MDBX transaction. #[doc(hidden)] - #[cfg(test)] pub fn txn(&self) -> *mut ffi::MDBX_txn { self.inner.txn.txn } + /// Executes the given closure once + /// + /// This is only intended to be used when accessing mdbx ffi functions directly is required. + /// + /// The caller **must** ensure that the pointer is only used within the closure. + #[inline] + #[doc(hidden)] + pub fn with_raw_tx_ptr(&self, f: F) -> T + where + F: FnOnce(*mut ffi::MDBX_txn) -> T, + { + let _lock = self.inner.txn.lock.lock(); + f(self.inner.txn.txn) + } + /// Returns a raw pointer to the MDBX environment. pub fn env(&self) -> &Environment { &self.inner.env } /// Returns the transaction id. - pub fn id(&self) -> Result { + pub fn id(&self) -> u64 { self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) }) } @@ -152,7 +168,7 @@ where ffi::MDBX_NOTFOUND => Ok(None), err_code => Err(Error::from_err_code(err_code)), } - })? + }) } /// Commits the transaction. @@ -175,9 +191,11 @@ where self.env().txn_manager().remove_active_read_transaction(txn); let mut latency = CommitLatency::new(); - mdbx_result(unsafe { - ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) - }) + mdbx_result_with_tx_kind::( + unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) }, + txn, + self.env().txn_manager(), + ) .map(|v| (v, latency)) } else { let (sender, rx) = sync_channel(0); @@ -186,7 +204,7 @@ where .send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender }); rx.recv().unwrap() } - })?; + }); self.inner.set_committed(); result @@ -225,8 +243,12 @@ where let mut flags: c_uint = 0; unsafe { self.txn_execute(|txn| { - mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut())) - })??; + mdbx_result_with_tx_kind::( + ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()), + txn, + self.env().txn_manager(), + ) + })?; } // The types are not the same on Windows. Great! @@ -244,8 +266,12 @@ where unsafe { let mut stat = Stat::new(); self.txn_execute(|txn| { - mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::())) - })??; + mdbx_result_with_tx_kind::( + ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::()), + txn, + self.env().txn_manager(), + ) + })?; Ok(stat) } } @@ -264,7 +290,7 @@ where #[cfg(feature = "read-tx-timeouts")] pub fn disable_timeout(&self) { if K::IS_READ_ONLY { - self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn); + self.env().txn_manager().remove_active_read_transaction(self.txn()); } } } @@ -316,11 +342,11 @@ where } #[inline] - fn txn_execute(&self, f: F) -> Result + fn txn_execute(&self, f: F) -> T where F: FnOnce(*mut ffi::MDBX_txn) -> T, { - self.txn.txn_execute_fail_on_timeout(f) + self.txn.txn_execute(f) } } @@ -329,9 +355,7 @@ where K: TransactionKind, { fn drop(&mut self) { - // To be able to abort a timed out transaction, we need to renew it first. - // Hence the usage of `txn_execute_renew_on_timeout` here. - let _ = self.txn.txn_execute_renew_on_timeout(|txn| { + self.txn_execute(|txn| { if !self.has_committed() { if K::IS_READ_ONLY { #[cfg(feature = "read-tx-timeouts")] @@ -348,7 +372,7 @@ where rx.recv().unwrap().unwrap(); } } - }); + }) } } @@ -394,7 +418,7 @@ impl Transaction { ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void }; mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits()) - })?)?; + }))?; Ok(()) } @@ -423,7 +447,7 @@ impl Transaction { &mut data_val, flags.bits() | ffi::MDBX_RESERVE, ) - })?)?; + }))?; Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len)) } } @@ -458,7 +482,7 @@ impl Transaction { } else { unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) } } - })? + }) }) .map(|_| true) .or_else(|e| match e { @@ -469,7 +493,7 @@ impl Transaction { /// Empties the given database. All items will be removed. pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> { - mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?; + mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?; Ok(()) } @@ -480,7 +504,7 @@ impl Transaction { /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn drop_db(&self, db: Database) -> Result<()> { - mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?; + mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?; Ok(()) } @@ -493,7 +517,11 @@ impl Transaction { /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn close_db(&self, db: Database) -> Result<()> { - self.txn_execute(|_| mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi())))??; + mdbx_result_with_tx_kind::( + ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()), + self.txn(), + self.env().txn_manager(), + )?; Ok(()) } @@ -514,12 +542,12 @@ impl Transaction { }); rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0)) - })? + }) } } /// A shareable pointer to an MDBX transaction. -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct TransactionPtr { txn: *mut ffi::MDBX_txn, lock: Arc>, @@ -530,52 +558,14 @@ impl TransactionPtr { Self { txn, lock: Arc::new(Mutex::new(())) } } - // Returns `true` if the transaction is timed out. - // - // When transaction is timed out via `TxnManager`, it's actually reset using - // `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such - // transactions), and sets the `MDBX_TXN_FINISHED` flag. - fn is_timed_out(&self) -> bool { - (unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED) != 0 - } - /// Executes the given closure once the lock on the transaction is acquired. - /// - /// Returns the result of the closure or an error if the transaction is timed out. - #[inline] - pub(crate) fn txn_execute_fail_on_timeout(&self, f: F) -> Result - where - F: FnOnce(*mut ffi::MDBX_txn) -> T, - { - let _lck = self.lock.lock(); - - // No race condition with the `TxnManager` timing out the transaction is possible here, - // because we're taking a lock for any actions on the transaction pointer, including a call - // to the `mdbx_txn_reset`. - if self.is_timed_out() { - return Err(Error::ReadTransactionTimeout) - } - - Ok((f)(self.txn)) - } - - /// Executes the given closure once the lock on the transaction is acquired. If the tranasction - /// is timed out, it will be renewed first. - /// - /// Returns the result of the closure or an error if the transaction renewal fails. #[inline] - fn txn_execute_renew_on_timeout(&self, f: F) -> Result + pub(crate) fn txn_execute(&self, f: F) -> T where F: FnOnce(*mut ffi::MDBX_txn) -> T, { let _lck = self.lock.lock(); - - // To be able to do any operations on the transaction, we need to renew it first. - if self.is_timed_out() { - mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?; - } - - Ok((f)(self.txn)) + (f)(self.txn) } } diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 82d53d9089ab..bf46680b81e5 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -52,6 +52,9 @@ impl TxnManager { /// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort] /// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex] fn start_message_listener(&self, env: EnvPtr, rx: Receiver) { + #[cfg(feature = "read-tx-timeouts")] + let read_transactions = self.read_transactions.clone(); + std::thread::spawn(move || { #[allow(clippy::redundant_locals)] let env = env; @@ -70,12 +73,34 @@ impl TxnManager { ) }) .map(|_| TxnPtr(txn)); + + #[cfg(feature = "read-tx-timeouts")] + { + use crate::transaction::TransactionKind; + + if res.is_ok() && flags == crate::transaction::RO::OPEN_FLAGS { + if let Some(read_transactions) = &read_transactions { + read_transactions.add_active(txn); + } + } + } + sender.send(res).unwrap(); } TxnManagerMessage::Abort { tx, sender } => { + #[cfg(feature = "read-tx-timeouts")] + if let Some(read_transactions) = &read_transactions { + read_transactions.remove_active(tx.0); + } + sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap(); } TxnManagerMessage::Commit { tx, sender } => { + #[cfg(feature = "read-tx-timeouts")] + if let Some(read_transactions) = &read_transactions { + read_transactions.remove_active(tx.0); + } + sender .send({ let mut latency = CommitLatency::new(); @@ -100,10 +125,7 @@ impl TxnManager { #[cfg(feature = "read-tx-timeouts")] mod read_transactions { - use crate::{ - environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr, - txn_manager::TxnManager, - }; + use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error}; use dashmap::{DashMap, DashSet}; use std::{ sync::{mpsc::sync_channel, Arc}, @@ -133,13 +155,9 @@ mod read_transactions { } /// Adds a new transaction to the list of active read transactions. - pub(crate) fn add_active_read_transaction( - &self, - ptr: *mut ffi::MDBX_txn, - tx: TransactionPtr, - ) { + pub(crate) fn add_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) { if let Some(read_transactions) = &self.read_transactions { - read_transactions.add_active(ptr, tx); + read_transactions.add_active(ptr); } } @@ -147,15 +165,16 @@ mod read_transactions { pub(crate) fn remove_active_read_transaction( &self, ptr: *mut ffi::MDBX_txn, - ) -> Option<(usize, (TransactionPtr, Instant))> { + ) -> Option<(usize, Instant)> { self.read_transactions.as_ref()?.remove_active(ptr) } - /// Returns the number of timed out transactions that were not aborted by the user yet. - pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option { - self.read_transactions - .as_ref() - .map(|read_transactions| read_transactions.timed_out_not_aborted()) + /// Removes a transaction from the list of aborted read transactions. + pub(crate) fn remove_aborted_read_transaction( + &self, + ptr: *mut ffi::MDBX_txn, + ) -> Option { + self.read_transactions.as_ref()?.remove_aborted(ptr) } } @@ -168,10 +187,13 @@ mod read_transactions { /// /// We store `usize` instead of a raw pointer as a key, because pointers are not /// comparable. The time of transaction opening is stored as a value. - active: DashMap, - /// List of timed out transactions that were not aborted by the user yet, hence have a - /// dangling read transaction pointer. - timed_out_not_aborted: DashSet, + active: DashMap, + /// List of read transactions aborted by the [ReadTransactions::start_monitor]. + /// We keep them until user tries to abort the transaction, so we're able to report a nice + /// [Error::ReadTransactionAborted] error. + /// + /// We store `usize` instead of a raw pointer, because pointers are not comparable. + aborted: DashSet, } impl ReadTransactions { @@ -180,70 +202,59 @@ mod read_transactions { } /// Adds a new transaction to the list of active read transactions. - pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) { - let _ = self.active.insert(ptr as usize, (tx, Instant::now())); + pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn) { + let _ = self.active.insert(ptr as usize, Instant::now()); } /// Removes a transaction from the list of active read transactions. - pub(super) fn remove_active( - &self, - ptr: *mut ffi::MDBX_txn, - ) -> Option<(usize, (TransactionPtr, Instant))> { - self.timed_out_not_aborted.remove(&(ptr as usize)); + pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> Option<(usize, Instant)> { self.active.remove(&(ptr as usize)) } - /// Returns the number of timed out transactions that were not aborted by the user yet. - pub(super) fn timed_out_not_aborted(&self) -> usize { - self.timed_out_not_aborted.len() + /// Adds a new transaction to the list of aborted read transactions. + pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) { + self.aborted.insert(ptr as usize); + } + + /// Removes a transaction from the list of aborted read transactions. + pub(super) fn remove_aborted(&self, ptr: *mut ffi::MDBX_txn) -> Option { + self.aborted.remove(&(ptr as usize)) } /// Spawns a new thread with [std::thread::spawn] that monitors the list of active read - /// transactions and timeouts those that are open for longer than + /// transactions and aborts those that are open for longer than /// `ReadTransactions.max_duration`. + /// + /// Aborted transaction pointers are placed into the list of aborted read transactions, and + /// removed from this list by [crate::error::mdbx_result_with_tx_kind] when the user tries + /// to use it. pub(super) fn start_monitor(self: Arc) { std::thread::spawn(move || { - let mut timed_out_active = Vec::new(); + let mut aborted_active = Vec::new(); loop { let now = Instant::now(); let mut max_active_transaction_duration = None; - // Iterate through active read transactions and time out those that's open for + // Iterate through active read transactions and abort those that's open for // longer than `self.max_duration`. for entry in self.active.iter() { - let (tx, start) = entry.value(); + let (ptr, start) = entry.pair(); let duration = now - *start; if duration > self.max_duration { - let result = tx.txn_execute_fail_on_timeout(|txn_ptr| { - ( - txn_ptr, - duration, - // Time out the transaction. - // - // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to - // prevent MDBX from reusing the pointer of the aborted - // transaction for new read-only transactions. This is - // important because we store the pointer in the `active` list - // and assume that it is unique. - // - // See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info. - mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }), - ) - }); - - match result { - Ok((txn_ptr, duration, error)) => { - // Add the transaction to `timed_out_active`. We can't remove it - // instantly from the list of active transactions, because we - // iterate through it. - timed_out_active.push((txn_ptr, duration, error)); - } - Err(err) => { - error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction") - } - } + let ptr = *ptr as *mut ffi::MDBX_txn; + + // Add the transaction to the list of aborted transactions, so further + // usages report the correct error when the transaction is closed. + self.add_aborted(ptr); + + // Abort the transaction + let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) }); + + // Add the transaction to `aborted_active`. We can't remove it instantly + // from the list of active transactions, because we iterate through it. + aborted_active.push((ptr, duration, result.err())); } else { max_active_transaction_duration = Some( duration.max(max_active_transaction_duration.unwrap_or_default()), @@ -251,38 +262,40 @@ mod read_transactions { } } - // Walk through timed out transactions, and delete them from the list of active + // Walk through aborted transactions, and delete them from the list of active // transactions. - for (ptr, open_duration, err) in timed_out_active.iter().copied() { + for (ptr, open_duration, err) in aborted_active.iter().copied() { // Try deleting the transaction from the list of active transactions. let was_in_active = self.remove_active(ptr).is_some(); - if let Err(err) = err { - if was_in_active { - // If the transaction was in the list of active transactions, - // then user didn't abort it and we failed to do so. - error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction"); + if let Some(err) = err { + // If there was an error when aborting the transaction, we need to + // remove it from the list of aborted transactions, because otherwise it + // will stay there forever. + self.remove_aborted(ptr); + if was_in_active && err != Error::BadSignature { + // If the transaction was in the list of active transactions and the + // error code is not `EBADSIGN`, then user didn't abort it. + error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transactions"); } } else { - // Happy path, the transaction has been timed out by us with no errors. - warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out"); - // Add transaction to the list of timed out transactions that were not - // aborted by the user yet. - self.timed_out_not_aborted.insert(ptr as usize); + // Happy path, the transaction has been aborted by us with no errors. + warn!(target: "libmdbx", ?open_duration, "Long-lived read transactions has been aborted"); } } - // Clear the list of timed out transactions, but not de-allocate the reserved + // Clear the list of aborted transactions, but not de-allocate the reserved // capacity to save on further pushes. - timed_out_active.clear(); + aborted_active.clear(); - if !self.active.is_empty() { + if !self.active.is_empty() || !self.aborted.is_empty() { trace!( target: "libmdbx", elapsed = ?now.elapsed(), active = ?self.active.iter().map(|entry| { - let (tx, start) = entry.value(); - (tx.clone(), start.elapsed()) + let (ptr, start) = entry.pair(); + (*ptr, start.elapsed()) }).collect::>(), + aborted = ?self.aborted.iter().map(|entry| *entry).collect::>(), "Read transactions" ); } @@ -302,10 +315,12 @@ mod read_transactions { #[cfg(test)] mod tests { use crate::{ - txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error, - MaxReadTransactionDuration, + txn_manager::{ + read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, TxnManagerMessage, TxnPtr, + }, + Environment, Error, MaxReadTransactionDuration, TransactionKind, RO, }; - use std::{thread::sleep, time::Duration}; + use std::{ptr, sync::mpsc::sync_channel, thread::sleep, time::Duration}; use tempfile::tempdir; #[test] @@ -330,6 +345,7 @@ mod read_transactions { drop(tx); assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(!read_transactions.aborted.contains(&tx_ptr)); } // Create a read-only transaction, successfully use it, close it by committing. @@ -342,43 +358,24 @@ mod read_transactions { tx.commit().unwrap(); assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(!read_transactions.aborted.contains(&tx_ptr)); } + // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the + // manager kills it, use it and observe the `Error::ReadTransactionAborted` error. { - // Create a read-only transaction and observe it's in the list of active - // transactions. let tx = env.begin_ro_txn().unwrap(); let tx_ptr = tx.txn() as usize; assert!(read_transactions.active.contains_key(&tx_ptr)); - // Wait until the transaction is timed out by the manager. sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL); - // Ensure that the transaction is not in the list of active transactions anymore, - // and is in the list of timed out but not aborted transactions. - assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); - - // Use the timed out transaction and observe the `Error::ReadTransactionTimeout` - assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout)); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); + assert!(read_transactions.aborted.contains(&tx_ptr)); - assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout)); + assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted)); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); - - // Ensure that the transaction pointer is not reused when opening a new read-only - // transaction. - let new_tx = env.begin_ro_txn().unwrap(); - let new_tx_ptr = new_tx.txn() as usize; - assert!(read_transactions.active.contains_key(&new_tx_ptr)); - assert_ne!(tx_ptr, new_tx_ptr); - - // Drop the transaction and ensure that it's not in the list of timed out but not - // aborted transactions anymore. - drop(tx); - assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr)); + assert!(!read_transactions.aborted.contains(&tx_ptr)); } } @@ -396,5 +393,64 @@ mod read_transactions { sleep(READ_TRANSACTIONS_CHECK_INTERVAL); assert!(tx.commit().is_ok()) } + + #[test] + fn txn_manager_begin_read_transaction_via_message_listener() { + const MAX_DURATION: Duration = Duration::from_secs(1); + + let dir = tempdir().unwrap(); + let env = Environment::builder() + .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION)) + .open(dir.path()) + .unwrap(); + + let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap(); + + // Create a read-only transaction via the message listener. + let (tx, rx) = sync_channel(0); + env.txn_manager().send_message(TxnManagerMessage::Begin { + parent: TxnPtr(ptr::null_mut()), + flags: RO::OPEN_FLAGS, + sender: tx, + }); + + let txn_ptr = rx.recv().unwrap().unwrap(); + + assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize))); + } + + #[test] + fn txn_manager_reassign_transaction_removes_from_aborted_transactions() { + const MAX_DURATION: Duration = Duration::from_secs(1); + + let dir = tempdir().unwrap(); + let env = Environment::builder() + .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION)) + .open(dir.path()) + .unwrap(); + + let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap(); + + // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the + // manager kills it, use it and observe the `Error::ReadTransactionAborted` error. + { + let tx = env.begin_ro_txn().unwrap(); + let tx_ptr = tx.txn() as usize; + assert!(read_transactions.active.contains_key(&tx_ptr)); + + sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL); + + assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(read_transactions.aborted.contains(&tx_ptr)); + } + + // Create a read-only transaction, ensure this removes it from aborted set if mdbx + // reassigns same recently aborted transaction pointer. + { + let tx = env.begin_ro_txn().unwrap(); + let tx_ptr = tx.txn() as usize; + assert!(!read_transactions.aborted.contains(&tx_ptr)); + } + } } }