diff --git a/CLAUDE.md b/CLAUDE.md index f8fc8c1..09c55a8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -33,13 +33,28 @@ be mediated via the `TxAccess` trait. ## API Patterns -### Cursor Creation +### Cursor Creation and Caching ```rust let db = txn.open_db(None).unwrap(); // Returns Database (has dbi + flags) let cursor = txn.cursor(db).unwrap(); // Takes Database, NOT raw dbi ``` +Cursors are transparently cached within transactions. When a cursor is +dropped, its raw pointer is returned to the transaction's cache. Subsequent +`cursor()` calls reuse cached pointers, avoiding `mdbx_cursor_open`/ +`mdbx_cursor_close` overhead (~100 ns per cycle). The cache is drained +and all pointers closed on commit or abort. + +`DbCache` (in `src/tx/cache.rs`) stores raw `*mut ffi::MDBX_cursor` +pointers, which makes it `!Send + !Sync` by default. Explicit `unsafe impl +Send + Sync for DbCache` is required because: +- `SyncKind::Cache` requires `Cache + Send` (for `RefCell: Send`) +- `SharedCache` uses `Arc>` which requires `DbCache: Send + Sync` +- This is sound because `Cursor` itself is already `unsafe impl Send + Sync`, + and all access to cached pointers is mediated by `RefCell` (unsync) or + `RwLock` (sync) + ### Database Flags Validation DUP_SORT/DUP_FIXED methods validate flags at runtime: diff --git a/src/tx/access.rs b/src/tx/access.rs index f0dec56..33e10c4 100644 --- a/src/tx/access.rs +++ b/src/tx/access.rs @@ -1,13 +1,20 @@ use crate::{ Environment, sys::txn_manager::{Abort, RawTxPtr}, + tx::{ + cache::{Cache, DbCache}, + ops, + }, }; use core::fmt; use parking_lot::{Mutex, MutexGuard}; -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - mpsc::sync_channel, +use std::{ + cell::RefCell, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + mpsc::sync_channel, + }, }; use tracing::debug_span; @@ -29,8 +36,16 @@ mod sealed { /// are stored for read-only and read-write transactions. It ensures that /// the transaction pointer can be accessed safely, respecting timeouts /// and ownership semantics. +/// +/// The associated [`Cache`] type co-locates the cursor cache with the +/// transaction so that the cache lifetime is bound to the transaction +/// pointer's: cached cursors are drained and closed exactly once, in the +/// implementing type's `Drop`, before the transaction is aborted. #[allow(unreachable_pub)] pub trait TxPtrAccess: fmt::Debug + sealed::Sealed { + /// Cache type co-located with this transaction pointer. + type Cache: Cache; + /// Create an instance of the implementing type from a raw transaction /// pointer. fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self @@ -45,6 +60,10 @@ pub trait TxPtrAccess: fmt::Debug + sealed::Sealed { /// Mark the transaction as committed. fn mark_committed(&self); + /// Returns a reference to the cursor/database cache associated with + /// this transaction. + fn cache(&self) -> &Self::Cache; + /// Get the transaction ID by making a call into the MDBX C API. fn tx_id(&self) -> Option { let mut id = 0; @@ -60,6 +79,8 @@ impl TxPtrAccess for Arc where T: TxPtrAccess, { + type Cache = T::Cache; + fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self where Self: Sized, @@ -77,12 +98,17 @@ where fn mark_committed(&self) { self.as_ref().mark_committed(); } + + fn cache(&self) -> &Self::Cache { + self.as_ref().cache() + } } -/// Wrapper for raw txn pointer for RW transactions. +/// Wrapper for raw txn pointer for unsynchronized transactions. pub struct PtrUnsync { committed: AtomicBool, ptr: *mut ffi::MDBX_txn, + cache: RefCell, } impl fmt::Debug for PtrUnsync { @@ -92,11 +118,13 @@ impl fmt::Debug for PtrUnsync { } impl TxPtrAccess for PtrUnsync { + type Cache = RefCell; + fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, _env: Environment, _is_read_only: bool) -> Self where Self: Sized, { - Self { committed: AtomicBool::new(false), ptr } + Self { committed: AtomicBool::new(false), ptr, cache: RefCell::new(DbCache::default()) } } fn with_txn_ptr(&self, f: F) -> R @@ -111,6 +139,10 @@ impl TxPtrAccess for PtrUnsync { // Type is neither Sync nor Send, so no concurrent access is possible. unsafe { *self.committed.as_ptr() = true }; } + + fn cache(&self) -> &Self::Cache { + &self.cache + } } impl Drop for PtrUnsync { @@ -118,6 +150,13 @@ impl Drop for PtrUnsync { // SAFETY: // We have exclusive ownership of this pointer. unsafe { + // Close any cached cursors before the txn ends. No-op if + // commit/abort already drained. + for cursor in self.cache.get_mut().drain_cursors() { + // SAFETY: cursor was returned from an earlier Cursor::drop + // bound to this still-live txn. + ops::cursor_close_raw(cursor); + } if !*self.committed.as_ptr() { ffi::mdbx_txn_abort(self.ptr); } @@ -139,9 +178,10 @@ pub struct PtrSync { /// Whether the transaction was committed. committed: AtomicBool, - /// Contains a lock to ensure exclusive access to the transaction. - /// The inner boolean indicates the timeout status. - lock: Mutex<()>, + /// Lock that serialises access to the transaction pointer **and** + /// guards the cursor/database cache. Held inside `with_txn_ptr`; also + /// taken implicitly via the `Cache` impl on `Mutex`. + lock: Mutex, /// The environment that owns the transaction. env: Environment, @@ -159,7 +199,7 @@ unsafe impl Sync for PtrSync {} impl PtrSync { /// Acquires the inner transaction lock to guarantee exclusive access to the transaction /// pointer. - pub(crate) fn lock(&self) -> MutexGuard<'_, ()> { + pub(crate) fn lock(&self) -> MutexGuard<'_, DbCache> { if let Some(lock) = self.lock.try_lock() { lock } else { @@ -176,13 +216,15 @@ impl PtrSync { } impl TxPtrAccess for PtrSync { + type Cache = Mutex; + fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self where Self: Sized, { Self { committed: AtomicBool::new(false), - lock: Mutex::new(()), + lock: Mutex::new(DbCache::default()), txn: ptr, env, is_read_only, @@ -200,10 +242,24 @@ impl TxPtrAccess for PtrSync { fn mark_committed(&self) { self.committed.store(true, Ordering::SeqCst); } + + fn cache(&self) -> &Self::Cache { + &self.lock + } } impl Drop for PtrSync { fn drop(&mut self) { + // Close any cached cursors before the transaction ends. Runs + // exactly once: this `Drop` only fires when the last `Arc` + // is released, so there is no race with surviving `TxSync` clones. + // No-op if `commit_inner` already drained the cache. + for cursor in self.lock.get_mut().drain_cursors() { + // SAFETY: cursor was returned from an earlier Cursor::drop + // bound to this still-live txn. + unsafe { ops::cursor_close_raw(cursor) }; + } + if self.committed.load(Ordering::SeqCst) { return; } diff --git a/src/tx/cache.rs b/src/tx/cache.rs index ff236bc..4bd76a7 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -1,35 +1,37 @@ -//! Caches for [`Database`] info, used by the [`TxSync`] and [`TxUnsync`] types. +//! Caches for [`Database`] info and cursor pointers, used by the [`TxSync`] +//! and [`TxUnsync`] types. //! -//! This module defines cache types for storing database handles within -//! transactions. Caches improve performance by avoiding repeated lookups of -//! database information. +//! This module defines cache types for storing database handles and cached +//! cursor pointers within transactions. The cache is co-located with the +//! transaction pointer (in [`PtrSync`]/[`PtrUnsync`]) so that its lifetime +//! is bound to the transaction's; this guarantees cached cursors are closed +//! exactly once, before the transaction is aborted or committed, even when +//! a `TxSync` is cloned across threads. //! -//! The primary caches are: -//! - [`DbCache`]: A simple inline cache using `SmallVec` for efficient storage -//! of a small number of database handles. Used in unsynchronized -//! transactions via [`RefCell`]. -//! - [`SharedCache`]: A thread-safe cache using `Arc>` for -//! synchronized transactions. +//! The container is [`DbCache`], used either: +//! - inline behind a [`RefCell`] for unsynchronized transactions, or +//! - inline behind a [`parking_lot::Mutex`] for synchronized transactions +//! (the same mutex that serialises raw txn-pointer access). //! //! [`TxSync`]: crate::tx::aliases::TxSync //! [`TxUnsync`]: crate::tx::aliases::TxUnsync +//! [`PtrSync`]: crate::tx::PtrSync +//! [`PtrUnsync`]: crate::tx::PtrUnsync use crate::Database; -use parking_lot::RwLock; +use parking_lot::Mutex; use smallvec::SmallVec; use std::{ cell::RefCell, hash::{Hash, Hasher}, - sync::Arc, }; -/// Cache trait for transaction-local database handles. +/// Cache trait for transaction-local database handles and cursors. /// -/// This is used by the [`SyncKind`] trait to define the cache type for each -/// transaction kind. -/// -/// [`SyncKind`]: crate::tx::kind::SyncKind -pub trait Cache: Clone + Default + std::fmt::Debug { +/// Implemented on the concrete container types that wrap a [`DbCache`] +/// (e.g. [`Mutex`] for synchronized transactions and +/// [`RefCell`] for unsynchronized ones). +pub trait Cache: std::fmt::Debug { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option; @@ -38,6 +40,35 @@ pub trait Cache: Clone + Default + std::fmt::Debug { /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi); + + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor>; + + /// Return a cursor to the cache for later reuse. + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor); + + /// Drain all cached cursors, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; + + /// Drain cached cursors for a specific DBI, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; + + /// Returns the total number of cached cursors across all DBIs. + #[cfg(test)] + fn cursor_count(&self) -> usize; + + /// Injects a raw cursor pointer into the cache for the given DBI. + /// + /// # Safety + /// + /// - `cursor` must either be a valid MDBX cursor pointer bound to the + /// enclosing transaction, or a pointer whose `mdbx_cursor_close2` + /// behaviour the caller accepts (tests that trigger failure paths + /// knowingly inject unusual values here). + #[cfg(test)] + unsafe fn inject_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor); } /// Cached database entry. @@ -73,107 +104,161 @@ impl From for Database { } } -/// Simple cache container for database handles. +/// Simple cache container for database handles and cursor pointers. /// /// Uses inline storage for the common case (most apps use < 16 databases). -#[derive(Debug, Default, Clone)] -#[repr(transparent)] -pub struct DbCache(SmallVec<[CachedDb; 16]>); +#[derive(Debug)] +pub struct DbCache { + dbs: SmallVec<[CachedDb; 16]>, + cursors: SmallVec<[(ffi::MDBX_dbi, *mut ffi::MDBX_cursor); 8]>, +} + +// SAFETY: DbCache contains `*mut ffi::MDBX_cursor` which is `!Send + !Sync`. +// These are raw MDBX cursor pointers bound to a transaction, not a thread. +// `Cursor` itself is already `Send + Sync` (see cursor.rs), so caching the +// same pointers here introduces no new unsoundness. All access to these +// pointers is mediated by `RefCell` (unsync path) or `RwLock` (sync path), +// ensuring no concurrent mutation. +unsafe impl Send for DbCache {} +unsafe impl Sync for DbCache {} + +impl Default for DbCache { + fn default() -> Self { + Self { dbs: SmallVec::new(), cursors: SmallVec::new() } + } +} impl DbCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - for entry in self.0.iter() { - if entry.name_hash == name_hash { - return Some(entry.db); - } - } - None + self.dbs.iter().find(|e| e.name_hash == name_hash).map(|e| e.db) } /// Write a database entry to the cache. fn write_db(&mut self, db: CachedDb) { - for entry in self.0.iter() { - if entry.name_hash == db.name_hash { - return; // Another thread beat us - } + if self.dbs.iter().any(|e| e.name_hash == db.name_hash) { + return; } - self.0.push(db); + self.dbs.push(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&mut self, dbi: ffi::MDBX_dbi) { - self.0.retain(|entry| entry.db.dbi() != dbi); + self.dbs.retain(|entry| entry.db.dbi() != dbi); } -} -/// Simple cache container for database handles. -/// -/// Uses inline storage for the common case (most apps use < 16 databases). -#[derive(Debug, Clone)] -pub struct SharedCache { - cache: Arc>, -} + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&mut self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.cursors.iter().position(|(d, _)| *d == dbi).map(|i| self.cursors.swap_remove(i).1) + } -impl SharedCache { - /// Creates a new empty cache. - fn new() -> Self { - Self { cache: Arc::new(RwLock::new(DbCache::default())) } + /// Return a cursor to the cache for later reuse. + fn return_cursor(&mut self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.cursors.push((dbi, cursor)); } - /// Returns a read guard to the cache. - fn read(&self) -> parking_lot::RwLockReadGuard<'_, DbCache> { - self.cache.read() + /// Drain all cached cursors, returning their raw pointers. + pub(crate) fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.cursors.drain(..).map(|(_, c)| c).collect() } - /// Returns a write guard to the cache. - fn write(&self) -> parking_lot::RwLockWriteGuard<'_, DbCache> { - self.cache.write() + /// Drain cached cursors for a specific DBI, returning their raw pointers. + fn drain_cursors_for_dbi( + &mut self, + dbi: ffi::MDBX_dbi, + ) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + let mut drained = SmallVec::new(); + self.cursors.retain(|(d, c)| { + if *d == dbi { + drained.push(*c); + false + } else { + true + } + }); + drained } } -impl Cache for SharedCache { - /// Read a database entry from the cache. +impl Cache for Mutex { fn read_db(&self, name_hash: u64) -> Option { - let cache = self.read(); - cache.read_db(name_hash) + self.lock().read_db(name_hash) } - /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.write(); - cache.write_db(db); + self.lock().write_db(db); } - /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.write(); - cache.remove_dbi(dbi); + self.lock().remove_dbi(dbi); } -} -impl Default for SharedCache { - fn default() -> Self { - Self::new() + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.lock().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.lock().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.lock().drain_cursors() + } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.lock().drain_cursors_for_dbi(dbi) + } + + #[cfg(test)] + fn cursor_count(&self) -> usize { + self.lock().cursors.len() + } + + #[cfg(test)] + unsafe fn inject_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.lock().cursors.push((dbi, cursor)); } } impl Cache for RefCell { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - let cache = self.borrow(); - cache.read_db(name_hash) + self.borrow().read_db(name_hash) } /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.borrow_mut(); - cache.write_db(db); + self.borrow_mut().write_db(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.borrow_mut(); - cache.remove_dbi(dbi); + self.borrow_mut().remove_dbi(dbi); + } + + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.borrow_mut().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.borrow_mut().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors() + } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors_for_dbi(dbi) + } + + #[cfg(test)] + fn cursor_count(&self) -> usize { + self.borrow().cursors.len() + } + + #[cfg(test)] + unsafe fn inject_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.borrow_mut().cursors.push((dbi, cursor)); } } diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index 9c058e5..e24fa37 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -6,6 +6,7 @@ use crate::{ tx::{ TxPtrAccess, aliases::IterKeyVals, + cache::Cache, iter::{Iter, IterDup, IterDupFixed, IterDupFixedOfKey, IterDupOfKey}, kind::WriteMarker, }, @@ -51,6 +52,17 @@ where Ok(Self { access, cursor, db, _kind: PhantomData }) } + /// Wraps an existing raw cursor pointer with cache support. + /// + /// The cursor must already be bound to the correct transaction and DBI. + pub(crate) const fn from_raw( + access: &'tx K::Access, + cursor: *mut ffi::MDBX_cursor, + db: Database, + ) -> Self { + Self { access, cursor, db, _kind: PhantomData } + } + /// Helper function for `Clone`. This should only be invoked within /// a `with_txn_ptr` call to ensure safety. fn new_at_position(other: &Self) -> MdbxResult { @@ -58,12 +70,14 @@ where let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); + if let Err(e) = mdbx_result(res) { + // Close directly — do NOT construct Self, as Drop would + // push this unbound cursor into the cache. + ffi::mdbx_cursor_close(cursor); + return Err(e); + } - let s = Self { access: other.access, cursor, db: other.db, _kind: PhantomData }; - - mdbx_result(res)?; - - Ok(s) + Ok(Self { access: other.access, cursor, db: other.db, _kind: PhantomData }) } } @@ -1072,11 +1086,10 @@ where K: TransactionKind, { fn drop(&mut self) { - // MDBX cursors MUST be closed. Failure to do so is a memory leak. - // - // To be able to close a cursor of a timed out transaction, we need to - // renew it first. Hence the usage of `with_txn_ptr_for_cleanup` here. - self.access.with_txn_ptr(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }); + // Return the cursor pointer to the transaction cache for reuse. + // The cache lives inside the access type, so the txn's Drop closes + // any leftover pointers before aborting. + self.access.cache().return_cursor(self.db.dbi(), self.cursor); } } diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 70e3513..3ea1f9c 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -47,15 +47,15 @@ impl fmt::Debug for TxMeta { /// /// [`TxSync`]: crate::tx::aliases::TxSync /// [`TxUnsync`]: crate::tx::aliases::TxUnsync -pub struct Tx::Access> { +pub struct Tx::Access> { txn: U, - cache: K::Cache, - meta: TxMeta, + + _kind: core::marker::PhantomData, } -impl fmt::Debug for Tx { +impl fmt::Debug for Tx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Tx").finish_non_exhaustive() } @@ -66,7 +66,11 @@ where K: TransactionKind>, { fn clone(&self) -> Self { - Self { txn: Arc::clone(&self.txn), cache: self.cache.clone(), meta: self.meta.clone() } + Self { + txn: Arc::clone(&self.txn), + meta: self.meta.clone(), + _kind: core::marker::PhantomData, + } } } @@ -75,8 +79,7 @@ impl Tx { pub(crate) fn from_access_and_env(txn: K::Access, env: Environment) -> Self { let span = K::new_span(txn.tx_id().unwrap_or_default()); let meta = TxMeta { env, span }; - let cache = K::Cache::default(); - Self { txn, cache, meta } + Self { txn, meta, _kind: core::marker::PhantomData } } /// Creates a new transaction wrapper from raw pointer and environment. @@ -156,7 +159,7 @@ where pub fn open_db(&self, name: Option<&str>) -> MdbxResult { let name_hash = CachedDb::hash_name(name); - if let Some(db) = self.cache.read_db(name_hash) { + if let Some(db) = self.txn.cache().read_db(name_hash) { return Ok(db); } @@ -174,7 +177,7 @@ where flags: DatabaseFlags, ) -> MdbxResult { let db = self.open_db_with_flags(name, flags)?; - self.cache.write_db(db); + self.txn.cache().write_db(db); Ok(db) } @@ -224,28 +227,82 @@ where /// Closes the database handle. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the handle is closed. + /// /// # Safety /// /// This will invalidate data cached in [`Database`] instances with the /// DBI, and may result in bad behavior when using those instances after /// calling this function. pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> MdbxResult<()> { + close_drained_cursors(&self.txn, self.txn.cache().drain_cursors_for_dbi(dbi)); // SAFETY: Caller ensures no other references exist. unsafe { ops::close_db_raw(self.meta.env.env_ptr(), dbi) }?; - self.cache.remove_dbi(dbi); + self.txn.cache().remove_dbi(dbi); Ok(()) } /// Opens a cursor on the given database. /// - /// Multiple cursors can be open simultaneously on different databases - /// within the same transaction. The cursor borrows the transaction's - /// inner access type, allowing concurrent cursor operations. + /// Cursors are transparently cached: dropped cursors return their + /// raw pointer to the cache, and subsequent calls reuse them without + /// a new `mdbx_cursor_open` allocation. Cached cursors are renewed + /// via `mdbx_cursor_renew` to reset their position. pub fn cursor(&self, db: Database) -> MdbxResult> { - Cursor::new(&self.txn, db) + let Some(raw) = self.txn.cache().take_cursor(db.dbi()) else { + return Cursor::new(&self.txn, db); + }; + self.with_txn_ptr(|txn_ptr| { + // SAFETY: txn_ptr is valid from with_txn_ptr. `raw` was produced + // by a prior Cursor::drop in this transaction. + match unsafe { ops::cursor_renew_raw(txn_ptr, raw) } { + Ok(()) => Ok(Cursor::from_raw(&self.txn, raw, db)), + Err(e) => { + // Renew failed — the cursor is in an indeterminate state. + // Close it defensively with the non-panicking variant and + // return the error. Do NOT re-insert into the cache, as + // a subsequent `cursor()` call would hit the same failure. + // SAFETY: within with_txn_ptr; close2 handles any cursor state. + unsafe { ops::cursor_close2_raw(raw) }; + Err(e) + } + } + }) + } + + /// Drains the cursor cache and closes all cached cursor pointers. + /// + /// Must be called before commit to ensure all cursors are closed while + /// the transaction is still valid. Abort flows rely on + /// `PtrSync::Drop` / `PtrUnsync::Drop` to drain instead. + fn drain_cached_cursors(&self) { + close_drained_cursors(&self.txn, self.txn.cache().drain_cursors()); } } +/// Closes a drained batch of cursor pointers via the ops module. +/// +/// This is a free function (rather than a method on [`Tx`]) so that the +/// `close_db` / `drop_db` paths and the access types' own `Drop` impls can +/// share a single closing routine, generic over the access type. +fn close_drained_cursors( + access: &U, + cursors: SmallVec<[*mut ffi::MDBX_cursor; 8]>, +) { + if cursors.is_empty() { + return; + } + access.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers are valid — they were returned by + // Cursor::drop during the lifetime of this transaction, which + // is still alive within the with_txn_ptr block. + unsafe { ops::cursor_close_raw(cursor) }; + } + }); +} + // Write-only impl Tx { /// Opens a handle to an MDBX database, creating the database if necessary. @@ -459,19 +516,23 @@ impl Tx { /// Drops the database from the environment. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the database is dropped. + /// /// # Safety /// /// Caller must ensure no [`Cursor`] or other references to the database /// exist. [`Database`] instances with the DBI will be invalidated, and /// use after calling this function may result in bad behavior. pub unsafe fn drop_db(&self, db: Database) -> MdbxResult<()> { + close_drained_cursors(&self.txn, self.txn.cache().drain_cursors_for_dbi(db.dbi())); self.with_txn_ptr(|txn| { // SAFETY: txn is a valid RW transaction pointer, caller ensures // no other references to dbi exist. unsafe { ops::drop_db_raw(txn, db.dbi()) } })?; - self.cache.remove_dbi(db.dbi()); + self.txn.cache().remove_dbi(db.dbi()); Ok(()) } @@ -489,6 +550,8 @@ where /// /// SAFETY: latency pointer must be valid for the duration of the commit. fn commit_inner(self, latency: *mut MDBX_commit_latency) -> MdbxResult<()> { + self.drain_cached_cursors(); + let was_aborted = self.with_txn_ptr(|txn| { if K::IS_READ_ONLY { mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency) }) @@ -542,6 +605,8 @@ where // span scope. let _guard = self.meta.span.clone().entered(); + self.drain_cached_cursors(); + // SAFETY: txn_ptr is valid from with_txn_ptr. let was_aborted = self.with_txn_ptr(|txn_ptr| unsafe { ops::commit_raw(txn_ptr, latency) })?; @@ -707,6 +772,135 @@ mod tests { assert_ne!(db1_a.dbi(), db2.dbi()); } + #[test] + fn test_cursor_cache_counts() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + let txn = TxUnsync::::begin(env.clone()).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + + assert_eq!(txn.txn.cache().cursor_count(), 0); + + { + let _c = txn.cursor(db).unwrap(); + assert_eq!(txn.txn.cache().cursor_count(), 0, "live cursor is not in cache"); + } + assert_eq!(txn.txn.cache().cursor_count(), 1, "dropped cursor returns to cache"); + + { + let _c = txn.cursor(db).unwrap(); + assert_eq!(txn.txn.cache().cursor_count(), 0, "second cursor reuses cached pointer"); + } + assert_eq!(txn.txn.cache().cursor_count(), 1, "re-dropped cursor returns to cache"); + + txn.commit().unwrap(); + } + + #[test] + fn test_cursor_cache_close_db_drains() { + let dir = tempdir().unwrap(); + let env = Environment::builder().set_max_dbs(4).open(dir.path()).unwrap(); + + // Create two DBs in a write txn and commit. + { + let txn = TxUnsync::::begin(env.clone()).unwrap(); + txn.create_db(Some("a"), DatabaseFlags::empty()).unwrap(); + txn.create_db(Some("b"), DatabaseFlags::empty()).unwrap(); + txn.commit().unwrap(); + } + + let txn = TxUnsync::::begin(env.clone()).unwrap(); + let db_a = txn.open_db(Some("a")).unwrap(); + let db_b = txn.open_db(Some("b")).unwrap(); + + // Populate the cache: one cursor per DBI. + drop(txn.cursor(db_a).unwrap()); + drop(txn.cursor(db_b).unwrap()); + assert_eq!(txn.txn.cache().cursor_count(), 2); + + // close_db on "a" must drain only that DBI's cursor. + // SAFETY: no live Cursor/Database instances for db_a remain. + unsafe { txn.close_db(db_a.dbi()).unwrap() }; + assert_eq!(txn.txn.cache().cursor_count(), 1, "close_db drains only its DBI"); + } + + #[test] + fn test_clone_drop_does_not_drain_shared_cache() { + // Regression for the bug evalir flagged on PR #11: dropping a + // `TxSync` clone must not drain the cursor cache shared with + // surviving clones. + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + let txn_a = RwTxSync::begin(env.clone()).unwrap(); + let db = txn_a.create_db(None, DatabaseFlags::empty()).unwrap(); + drop(txn_a.cursor(db).unwrap()); + assert_eq!(txn_a.txn.cache().cursor_count(), 1); + + let txn_b = txn_a.clone(); + drop(txn_b); + assert_eq!( + txn_a.txn.cache().cursor_count(), + 1, + "clone-drop must not drain the shared cursor cache" + ); + + // Surviving clone can still reuse the cached cursor. + drop(txn_a.cursor(db).unwrap()); + assert_eq!(txn_a.txn.cache().cursor_count(), 1); + } + + #[test] + fn test_concurrent_clone_drop_no_uaf() { + // Two threads each hold a clone and drop concurrently. Repeat to + // shake out any race in cache cleanup. With cache co-located in + // `PtrSync`, the cleanup runs exactly once when the last `Arc` + // drops — no race possible. + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + for _ in 0..200 { + let txn = RwTxSync::begin(env.clone()).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + // Populate cache with a few cursors. + drop(txn.cursor(db).unwrap()); + drop(txn.cursor(db).unwrap()); + drop(txn.cursor(db).unwrap()); + + let a = txn.clone(); + let b = txn.clone(); + drop(txn); // original + + let h1 = std::thread::spawn(move || drop(a)); + let h2 = std::thread::spawn(move || drop(b)); + h1.join().unwrap(); + h2.join().unwrap(); + } + } + + #[test] + fn test_cursor_renew_error_does_not_leak() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + let txn = TxUnsync::::begin(env.clone()).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + + // Inject a null pointer into the cache for db's DBI. The cursor() + // cache-hit path will call mdbx_cursor_renew on it, which returns + // MDBX_EINVAL for a null cursor; the error path closes via + // mdbx_cursor_close2 (also null-safe) and must not re-insert. + // SAFETY: the renew error path uses cursor_close2_raw which tolerates null. + unsafe { txn.txn.cache().inject_cursor(db.dbi(), ptr::null_mut()) }; + assert_eq!(txn.txn.cache().cursor_count(), 1); + + let err = txn.cursor(db).expect_err("renew must fail on null cursor"); + assert!(matches!(err, MdbxError::DecodeError), "expected EINVAL mapping, got {err:?}"); + assert_eq!(txn.txn.cache().cursor_count(), 0, "failed renew must not leak the pointer"); + + // A follow-up call falls through to the open path and succeeds, + // proving recovery via retry works. + let _c = txn.cursor(db).unwrap(); + } + fn __compile_checks() { fn assert_sync() {} assert_sync::(); diff --git a/src/tx/iter/dupfixed.rs b/src/tx/iter/dupfixed.rs index ef678a3..462af05 100644 --- a/src/tx/iter/dupfixed.rs +++ b/src/tx/iter/dupfixed.rs @@ -96,11 +96,12 @@ where Key: core::fmt::Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let remaining_in_page = if self.value_size > 0 { - self.current_page.len().saturating_sub(self.page_offset) / self.value_size - } else { - 0 - }; + let remaining_in_page = self + .current_page + .len() + .saturating_sub(self.page_offset) + .checked_div(self.value_size) + .unwrap_or_default(); f.debug_struct("IterDupFixed") .field("exhausted", &self.exhausted) .field("value_size", &self.value_size) diff --git a/src/tx/iter/dupfixed_key.rs b/src/tx/iter/dupfixed_key.rs index 92149bb..c961e65 100644 --- a/src/tx/iter/dupfixed_key.rs +++ b/src/tx/iter/dupfixed_key.rs @@ -48,11 +48,12 @@ where K: TransactionKind, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let remaining_in_page = if self.value_size > 0 { - self.current_page.len().saturating_sub(self.page_offset) / self.value_size - } else { - 0 - }; + let remaining_in_page = self + .current_page + .len() + .saturating_sub(self.page_offset) + .checked_div(self.value_size) + .unwrap_or_default(); f.debug_struct("IterDupFixedOfKey") .field("exhausted", &self.exhausted) .field("value_size", &self.value_size) diff --git a/src/tx/kind.rs b/src/tx/kind.rs index 5e97f14..9802a49 100644 --- a/src/tx/kind.rs +++ b/src/tx/kind.rs @@ -1,13 +1,9 @@ -use std::{cell::RefCell, fmt::Debug, ptr, sync::Arc}; +use std::{fmt::Debug, ptr, sync::Arc}; use crate::{ Environment, MdbxResult, error::mdbx_result, - tx::{ - PtrSync, TxPtrAccess, - access::PtrUnsync, - cache::{Cache, DbCache, SharedCache}, - }, + tx::{PtrSync, TxPtrAccess, access::PtrUnsync}, }; use ffi::{MDBX_TXN_RDONLY, MDBX_TXN_READWRITE, MDBX_txn_flags_t}; @@ -86,31 +82,24 @@ pub trait SyncKind { /// The inner storage type for the transaction pointer. type Access: TxPtrAccess; - - /// Cache type used for this transaction kind. - type Cache: Cache + Send; } impl SyncKind for RoSync { const SYNC: bool = true; type Access = Arc; - type Cache = SharedCache; } impl SyncKind for RwSync { const SYNC: bool = true; type Access = Arc; - type Cache = SharedCache; } impl SyncKind for Ro { type Access = PtrUnsync; - type Cache = RefCell; } impl SyncKind for Rw { type Access = PtrUnsync; - type Cache = RefCell; } /// Marker trait for writable transaction kinds. diff --git a/src/tx/ops.rs b/src/tx/ops.rs index 8e583ce..18d564b 100644 --- a/src/tx/ops.rs +++ b/src/tx/ops.rs @@ -271,6 +271,63 @@ pub(crate) unsafe fn close_db_raw(env: *mut ffi::MDBX_env, dbi: ffi::MDBX_dbi) - Ok(()) } +/// Renews a cursor, binding it to the given transaction and resetting its +/// position. +/// +/// # Safety +/// +/// - `txn` must be a valid, non-null transaction pointer. +/// - `cursor` may be null or a valid cursor pointer. A null pointer returns +/// an error ([`MDBX_EINVAL`]) rather than undefined behaviour. +/// +/// [`MDBX_EINVAL`]: ffi::MDBX_EINVAL +#[inline(always)] +pub(crate) unsafe fn cursor_renew_raw( + txn: *mut ffi::MDBX_txn, + cursor: *mut ffi::MDBX_cursor, +) -> MdbxResult<()> { + // SAFETY: Caller guarantees txn is valid; MDBX null-checks the cursor. + mdbx_result(unsafe { ffi::mdbx_cursor_renew(txn, cursor) })?; + Ok(()) +} + +/// Closes a cursor, freeing its MDBX allocation. +/// +/// # Safety +/// +/// - `cursor` must be a valid, non-null cursor pointer. Passing null or an +/// invalid cursor causes an MDBX-internal abort. +/// - Must be called within a [`TxPtrAccess::with_txn_ptr`] block so the +/// cursor's transaction is still live. +/// +/// [`TxPtrAccess::with_txn_ptr`]: crate::tx::access::TxPtrAccess::with_txn_ptr +#[inline(always)] +pub(crate) unsafe fn cursor_close_raw(cursor: *mut ffi::MDBX_cursor) { + // SAFETY: Caller guarantees cursor is valid and non-null. + unsafe { ffi::mdbx_cursor_close(cursor) }; +} + +/// Closes a cursor without panicking on invalid input. +/// +/// Unlike [`cursor_close_raw`], this wraps `mdbx_cursor_close2` which +/// returns an error code instead of aborting the process when given a +/// null or otherwise invalid cursor. The return value is discarded since +/// this is only used on error paths where the outcome of the close is +/// irrelevant. +/// +/// # Safety +/// +/// - `cursor` may be null or a potentially-invalid cursor pointer. +/// - Must be called within a [`TxPtrAccess::with_txn_ptr`] block so the +/// cursor's transaction is still live. +/// +/// [`TxPtrAccess::with_txn_ptr`]: crate::tx::access::TxPtrAccess::with_txn_ptr +#[inline(always)] +pub(crate) unsafe fn cursor_close2_raw(cursor: *mut ffi::MDBX_cursor) { + // SAFETY: MDBX null-checks the cursor and returns an error on invalid input. + let _ = unsafe { ffi::mdbx_cursor_close2(cursor) }; +} + /// Checks if a memory pointer refers to dirty (modified) data. /// /// Returns `true` if the data is dirty and must be copied before borrowing. diff --git a/tests/cursor.rs b/tests/cursor.rs index c5d6cfa..619d9a4 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1729,6 +1729,277 @@ fn test_put_multiple_empty_values_v2() { test_put_multiple_empty_values_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_v1() { + test_cursor_cache_reuse_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_v2() { + test_cursor_cache_reuse_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_multiple_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"a", b"1", WriteFlags::empty()).unwrap(); + txn.put(db, b"b", b"2", WriteFlags::empty()).unwrap(); + txn.put(db, b"c", b"3", WriteFlags::empty()).unwrap(); + + // Open two cursors, drop both (both return to cache) + { + let _c1 = txn.cursor(db).unwrap(); + let _c2 = txn.cursor(db).unwrap(); + } + + // Open two again — both should reuse cached pointers + { + let mut c1 = txn.cursor(db).unwrap(); + let mut c2 = txn.cursor(db).unwrap(); + + let (k1, _) = c1.first::, Vec>().unwrap().unwrap(); + let (k2, _) = c2.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k1, b"a"); + assert_eq!(&k2, b"c"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_multiple_v1() { + test_cursor_cache_multiple_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_multiple_v2() { + test_cursor_cache_multiple_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v1() { + test_cursor_cache_repeated_cycles_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v2() { + test_cursor_cache_repeated_cycles_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_reuse_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + // Populate via RW, then commit + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + // Test cursor caching in RO txn + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_ro_v1() { + test_cursor_cache_reuse_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_ro_v2() { + test_cursor_cache_reuse_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v1() { + test_cursor_cache_repeated_cycles_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v2() { + test_cursor_cache_repeated_cycles_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_reuse_across_writes_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + + // cursor -> read -> drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Write new data (B-tree COW) + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // cursor (from cache) -> read -> should see updated data + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + + // Verify both entries visible + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v1() { + test_cursor_cache_reuse_across_writes_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v2() { + test_cursor_cache_reuse_across_writes_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test]