Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/cold/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ pub enum ColdStorageError {
#[error("stream deadline exceeded")]
StreamDeadlineExceeded,

/// A non-streaming read operation exceeded its deadline.
///
/// The task enforces a wall-clock limit on in-task read handlers so
/// that a stuck backend call cannot indefinitely hold a concurrency
/// permit and block the drain-before-write barrier. Operations that
/// legitimately take longer than this deadline should use
/// [`StreamLogs`](crate::ColdReadRequest::StreamLogs) instead.
#[error("cold read deadline exceeded")]
Timeout,

/// A reorg was detected during a streaming operation.
///
/// The anchor block hash changed between chunks, indicating that
Expand Down
4 changes: 3 additions & 1 deletion crates/cold/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@
mod error;
pub use error::{ColdResult, ColdStorageError};
mod request;
pub use request::{AppendBlockRequest, ColdReadRequest, ColdWriteRequest, Responder};
pub use request::{
AppendBlockRequest, ColdReadRequest, ColdWriteRequest, PermittedReadRequest, Responder,
};
mod specifier;
pub use alloy::rpc::types::{Filter, Log as RpcLog};
pub use signet_storage_types::{Confirmed, Recovered};
Expand Down
44 changes: 43 additions & 1 deletion crates/cold/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
use alloy::primitives::BlockNumber;
use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader};
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::{OwnedSemaphorePermit, oneshot};

/// Response sender type alias that propagates Result types.
pub type Responder<T, E = ColdStorageError> = oneshot::Sender<Result<T, E>>;
Expand All @@ -24,6 +24,27 @@ pub struct AppendBlockRequest {
pub resp: Responder<()>,
}

/// A read request with an attached concurrency permit.
///
/// The permit is acquired on the handle side before sending, bounds
/// concurrent in-flight readers, and doubles as the drain-before-write
/// marker in the task runner. It is released when the spawned handler
/// completes (or panics, or is dropped on deadline expiry).
#[derive(Debug)]
pub struct PermittedReadRequest {
/// The concurrency permit, released when the handler future is dropped.
pub permit: OwnedSemaphorePermit,
/// The read request itself.
pub req: ColdReadRequest,
}

impl PermittedReadRequest {
/// Construct a new permitted request.
pub const fn new(permit: OwnedSemaphorePermit, req: ColdReadRequest) -> Self {
Self { permit, req }
}
}

/// Read requests for cold storage.
///
/// These requests are processed concurrently (up to 64 in flight).
Expand Down Expand Up @@ -139,6 +160,27 @@ pub enum ColdReadRequest {
},
}

impl ColdReadRequest {
/// Short static name of the request variant, for logging and metrics.
pub const fn variant_name(&self) -> &'static str {
match self {
Self::GetHeader { .. } => "GetHeader",
Self::GetHeaders { .. } => "GetHeaders",
Self::GetTransaction { .. } => "GetTransaction",
Self::GetTransactionsInBlock { .. } => "GetTransactionsInBlock",
Self::GetTransactionCount { .. } => "GetTransactionCount",
Self::GetReceipt { .. } => "GetReceipt",
Self::GetReceiptsInBlock { .. } => "GetReceiptsInBlock",
Self::GetSignetEvents { .. } => "GetSignetEvents",
Self::GetZenithHeader { .. } => "GetZenithHeader",
Self::GetZenithHeaders { .. } => "GetZenithHeaders",
Self::GetLogs { .. } => "GetLogs",
Self::StreamLogs { .. } => "StreamLogs",
Self::GetLatestBlock { .. } => "GetLatestBlock",
}
}
}

/// Write requests for cold storage.
///
/// These requests are processed sequentially to maintain ordering.
Expand Down
43 changes: 31 additions & 12 deletions crates/cold/src/task/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@

use crate::{
AppendBlockRequest, BlockData, ColdReadRequest, ColdReceipt, ColdResult, ColdStorageError,
ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog,
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, PermittedReadRequest,
ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
};
use alloy::primitives::{B256, BlockNumber};
use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use std::{sync::Arc, time::Duration};
use tokio::sync::{Semaphore, mpsc, oneshot};

/// Map a [`mpsc::error::TrySendError`] to the appropriate
/// [`ColdStorageError`] variant.
Expand Down Expand Up @@ -52,22 +52,40 @@ fn map_dispatch_error<T>(e: mpsc::error::TrySendError<T>) -> ColdStorageError {
/// Multiple readers can query concurrently without blocking writes.
#[derive(Clone, Debug)]
pub struct ColdStorageReadHandle {
sender: mpsc::Sender<ColdReadRequest>,
sender: mpsc::Sender<PermittedReadRequest>,
sem: Arc<Semaphore>,
}

impl ColdStorageReadHandle {
/// Create a new read-only handle with the given sender.
pub(crate) const fn new(sender: mpsc::Sender<ColdReadRequest>) -> Self {
Self { sender }
/// Create a new read-only handle with the given sender and permit
/// pool.
pub(crate) const fn new(
sender: mpsc::Sender<PermittedReadRequest>,
sem: Arc<Semaphore>,
) -> Self {
Self { sender, sem }
}

/// Send a read request and wait for the response.
///
/// Acquires a concurrency permit before sending. The channel is
/// sized to match the permit pool, so once a permit is held, send
/// is guaranteed to have capacity.
async fn send<T>(
&self,
req: ColdReadRequest,
rx: oneshot::Receiver<ColdResult<T>>,
) -> ColdResult<T> {
self.sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?;
let permit = Arc::clone(&self.sem)
.acquire_owned()
.await
.map_err(|_| ColdStorageError::TaskTerminated)?;
self.sender.try_send(PermittedReadRequest::new(permit, req)).map_err(|e| match e {
mpsc::error::TrySendError::Full(_) => {
unreachable!("semaphore permit implies channel capacity")
}
mpsc::error::TrySendError::Closed(_) => ColdStorageError::TaskTerminated,
})?;
rx.await.map_err(|_| ColdStorageError::Cancelled)?
}

Expand Down Expand Up @@ -357,12 +375,13 @@ pub struct ColdStorageHandle {
}

impl ColdStorageHandle {
/// Create a new handle with the given senders.
/// Create a new handle with the given senders and permit pool.
pub(crate) const fn new(
read_sender: mpsc::Sender<ColdReadRequest>,
read_sender: mpsc::Sender<PermittedReadRequest>,
read_sem: Arc<Semaphore>,
write_sender: mpsc::Sender<ColdWriteRequest>,
) -> Self {
Self { reader: ColdStorageReadHandle::new(read_sender), write_sender }
Self { reader: ColdStorageReadHandle::new(read_sender, read_sem), write_sender }
}

/// Get a read-only handle that shares the read channel.
Expand Down
Loading
Loading