Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,28 @@ be mediated via the `TxAccess` trait.

## API Patterns

### Cursor Creation
### Cursor Creation and Caching

```rust
let db = txn.open_db(None).unwrap(); // Returns Database (has dbi + flags)
let cursor = txn.cursor(db).unwrap(); // Takes Database, NOT raw dbi
```

Cursors are transparently cached within transactions. When a cursor is
dropped, its raw pointer is returned to the transaction's cache. Subsequent
`cursor()` calls reuse cached pointers, avoiding `mdbx_cursor_open`/
`mdbx_cursor_close` overhead (~100 ns per cycle). The cache is drained
and all pointers closed on commit or abort.

`DbCache` (in `src/tx/cache.rs`) stores raw `*mut ffi::MDBX_cursor`
pointers, which makes it `!Send + !Sync` by default. Explicit `unsafe impl
Send + Sync for DbCache` is required because:
- `SyncKind::Cache` requires `Cache + Send` (for `RefCell<DbCache>: Send`)
- `SharedCache` uses `Arc<RwLock<DbCache>>` which requires `DbCache: Send + Sync`
- This is sound because `Cursor` itself is already `unsafe impl Send + Sync`,
and all access to cached pointers is mediated by `RefCell` (unsync) or
`RwLock` (sync)

### Database Flags Validation

DUP_SORT/DUP_FIXED methods validate flags at runtime:
Expand Down
78 changes: 67 additions & 11 deletions src/tx/access.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use crate::{
Environment,
sys::txn_manager::{Abort, RawTxPtr},
tx::{
cache::{Cache, DbCache},
ops,
},
};
use core::fmt;
use parking_lot::{Mutex, MutexGuard};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::sync_channel,
use std::{
cell::RefCell,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::sync_channel,
},
};
use tracing::debug_span;

Expand All @@ -29,8 +36,16 @@ mod sealed {
/// are stored for read-only and read-write transactions. It ensures that
/// the transaction pointer can be accessed safely, respecting timeouts
/// and ownership semantics.
///
/// The associated [`Cache`] type co-locates the cursor cache with the
/// transaction so that the cache lifetime is bound to the transaction
/// pointer's: cached cursors are drained and closed exactly once, in the
/// implementing type's `Drop`, before the transaction is aborted.
#[allow(unreachable_pub)]
pub trait TxPtrAccess: fmt::Debug + sealed::Sealed {
/// Cache type co-located with this transaction pointer.
type Cache: Cache;

/// Create an instance of the implementing type from a raw transaction
/// pointer.
fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self
Expand All @@ -45,6 +60,10 @@ pub trait TxPtrAccess: fmt::Debug + sealed::Sealed {
/// Mark the transaction as committed.
fn mark_committed(&self);

/// Returns a reference to the cursor/database cache associated with
/// this transaction.
fn cache(&self) -> &Self::Cache;

/// Get the transaction ID by making a call into the MDBX C API.
fn tx_id(&self) -> Option<usize> {
let mut id = 0;
Expand All @@ -60,6 +79,8 @@ impl<T> TxPtrAccess for Arc<T>
where
T: TxPtrAccess,
{
type Cache = T::Cache;

fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self
where
Self: Sized,
Expand All @@ -77,12 +98,17 @@ where
fn mark_committed(&self) {
self.as_ref().mark_committed();
}

fn cache(&self) -> &Self::Cache {
self.as_ref().cache()
}
}

/// Wrapper for raw txn pointer for RW transactions.
/// Wrapper for raw txn pointer for unsynchronized transactions.
pub struct PtrUnsync {
committed: AtomicBool,
ptr: *mut ffi::MDBX_txn,
cache: RefCell<DbCache>,
}

impl fmt::Debug for PtrUnsync {
Expand All @@ -92,11 +118,13 @@ impl fmt::Debug for PtrUnsync {
}

impl TxPtrAccess for PtrUnsync {
type Cache = RefCell<DbCache>;

fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, _env: Environment, _is_read_only: bool) -> Self
where
Self: Sized,
{
Self { committed: AtomicBool::new(false), ptr }
Self { committed: AtomicBool::new(false), ptr, cache: RefCell::new(DbCache::default()) }
}

fn with_txn_ptr<F, R>(&self, f: F) -> R
Expand All @@ -111,13 +139,24 @@ impl TxPtrAccess for PtrUnsync {
// Type is neither Sync nor Send, so no concurrent access is possible.
unsafe { *self.committed.as_ptr() = true };
}

fn cache(&self) -> &Self::Cache {
&self.cache
}
}

impl Drop for PtrUnsync {
fn drop(&mut self) {
// SAFETY:
// We have exclusive ownership of this pointer.
unsafe {
// Close any cached cursors before the txn ends. No-op if
// commit/abort already drained.
for cursor in self.cache.get_mut().drain_cursors() {
// SAFETY: cursor was returned from an earlier Cursor::drop
// bound to this still-live txn.
ops::cursor_close_raw(cursor);
}
if !*self.committed.as_ptr() {
ffi::mdbx_txn_abort(self.ptr);
}
Expand All @@ -139,9 +178,10 @@ pub struct PtrSync {
/// Whether the transaction was committed.
committed: AtomicBool,

/// Contains a lock to ensure exclusive access to the transaction.
/// The inner boolean indicates the timeout status.
lock: Mutex<()>,
/// Lock that serialises access to the transaction pointer **and**
/// guards the cursor/database cache. Held inside `with_txn_ptr`; also
/// taken implicitly via the `Cache` impl on `Mutex<DbCache>`.
lock: Mutex<DbCache>,

/// The environment that owns the transaction.
env: Environment,
Expand All @@ -159,7 +199,7 @@ unsafe impl Sync for PtrSync {}
impl PtrSync {
/// Acquires the inner transaction lock to guarantee exclusive access to the transaction
/// pointer.
pub(crate) fn lock(&self) -> MutexGuard<'_, ()> {
pub(crate) fn lock(&self) -> MutexGuard<'_, DbCache> {
if let Some(lock) = self.lock.try_lock() {
lock
} else {
Expand All @@ -176,13 +216,15 @@ impl PtrSync {
}

impl TxPtrAccess for PtrSync {
type Cache = Mutex<DbCache>;

fn from_ptr_and_env(ptr: *mut ffi::MDBX_txn, env: Environment, is_read_only: bool) -> Self
where
Self: Sized,
{
Self {
committed: AtomicBool::new(false),
lock: Mutex::new(()),
lock: Mutex::new(DbCache::default()),
txn: ptr,
env,
is_read_only,
Expand All @@ -200,10 +242,24 @@ impl TxPtrAccess for PtrSync {
fn mark_committed(&self) {
self.committed.store(true, Ordering::SeqCst);
}

fn cache(&self) -> &Self::Cache {
&self.lock
}
}

impl Drop for PtrSync {
fn drop(&mut self) {
// Close any cached cursors before the transaction ends. Runs
// exactly once: this `Drop` only fires when the last `Arc<PtrSync>`
// is released, so there is no race with surviving `TxSync` clones.
// No-op if `commit_inner` already drained the cache.
for cursor in self.lock.get_mut().drain_cursors() {
// SAFETY: cursor was returned from an earlier Cursor::drop
// bound to this still-live txn.
unsafe { ops::cursor_close_raw(cursor) };
}

if self.committed.load(Ordering::SeqCst) {
return;
}
Expand Down
Loading