Skip to content
Merged
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name = "zenith-builder-example"
path = "bin/builder.rs"

[dependencies]
init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] }
init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] }

signet-constants = { version = "0.16.0-rc.17" }
signet-sim = { version = "0.16.0-rc.17" }
Expand Down
9 changes: 9 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const TX_POLL_ERRORS_HELP: &str = "Transaction cache poll errors.";
const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched";
const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle.";

const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts";
const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE transaction stream reconnect attempts.";

const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count";
const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts.";

Expand Down Expand Up @@ -148,6 +151,7 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| {
describe_counter!(TX_POLL_COUNT, TX_POLL_COUNT_HELP);
describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP);
describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP);
describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP);
describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP);
describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP);
describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP);
Expand Down Expand Up @@ -234,6 +238,11 @@ pub(crate) fn record_txs_fetched(count: usize) {
histogram!(TXS_FETCHED).record(count as f64);
}

/// Increment the SSE reconnect attempts counter.
pub(crate) fn inc_sse_reconnect_attempts() {
counter!(SSE_RECONNECT_ATTEMPTS).increment(1);
}

/// Increment the bundle poll attempt counter.
pub(crate) fn inc_bundle_poll_count() {
counter!(BUNDLE_POLL_COUNT).increment(1);
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/cache/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl CacheTasks {
/// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s.
pub fn spawn(&self) -> CacheSystem {
// Tx Poller pulls transactions from the cache
let tx_poller = TxPoller::new();
let tx_poller = TxPoller::new(self.block_env.clone());
let (tx_receiver, tx_poller) = tx_poller.spawn();

// Bundle Poller pulls bundles from the cache
Expand Down
227 changes: 161 additions & 66 deletions src/tasks/cache/tx.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,68 @@
//! Transaction service responsible for fetching and sending transactions to the simulator.
use crate::config::BuilderConfig;
use crate::{config::BuilderConfig, tasks::env::SimEnv};
use alloy::{
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
providers::Provider,
};
use futures_util::{TryFutureExt, TryStreamExt};
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use signet_tx_cache::{TxCache, TxCacheError};
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle, time};
use tracing::{Instrument, debug, debug_span, trace, trace_span};

/// Poll interval for the transaction poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
use std::{ops::ControlFlow, pin::Pin, time::Duration};
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
time,
};
use tracing::{Instrument, debug, debug_span, trace, trace_span, warn};

type SseStream = Pin<Box<dyn Stream<Item = Result<TxEnvelope, TxCacheError>> + Send>>;

const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_millis(250);
const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30);

/// Builds the SSE reconnect backoff iterator.
///
/// Yields delays of 250ms, 500ms, 1s, 2s, 4s, 8s, 16s, 30s, 30s, … — doubling
/// each step, capped at [`MAX_RECONNECT_BACKOFF`] and unbounded in count.
/// Retries are intentionally unbounded: the per-block-env full refetch in
/// [`TxPoller::task_future`] is the floor on correctness, so SSE is a
/// best-effort latency optimization on top of it.
fn reconnect_backoff() -> ExponentialBackoff {
ExponentialBuilder::default()
.with_factor(2.0)
.with_min_delay(INITIAL_RECONNECT_BACKOFF)
Comment thread
Evalir marked this conversation as resolved.
.with_max_delay(MAX_RECONNECT_BACKOFF)
.without_max_times()
.build()
}

/// Implements a poller for the block builder to pull transactions from the
/// transaction pool.
#[derive(Debug, Clone)]
/// Fetches transactions from the transaction pool on startup and on each
/// block environment change, and subscribes to an SSE stream for real-time
/// delivery of new transactions in between.
#[derive(Debug)]
pub struct TxPoller {
/// Config values from the Builder.
config: &'static BuilderConfig,
/// Client for the tx cache.
tx_cache: TxCache,
/// Defines the interval at which the service should poll the cache.
poll_interval_ms: u64,
/// Receiver for block environment updates, used to trigger refetches.
envs: watch::Receiver<Option<SimEnv>>,
/// SSE reconnect backoff. Reconnect attempts are intentionally unbounded:
/// this poller is a long-lived subscriber, so tx-cache outages should
/// degrade delivery until recovery rather than permanently disabling it.
reconnect_backoff: ExponentialBackoff,
}

impl Default for TxPoller {
fn default() -> Self {
Self::new()
}
}

/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool
/// and sends them into the provided channel sender.
impl TxPoller {
/// Returns a new [`TxPoller`] with the given config.
/// * Defaults to 1000ms poll interval (1s).
pub fn new() -> Self {
Self::new_with_poll_interval_ms(POLL_INTERVAL_MS)
}

/// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
/// Returns a new [`TxPoller`] with the given block environment receiver.
pub fn new(envs: watch::Receiver<Option<SimEnv>>) -> Self {
let config = crate::config();
let tx_cache = TxCache::new(config.tx_pool_url.clone());
Self { config, tx_cache, poll_interval_ms }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
Self { config, tx_cache, envs, reconnect_backoff: reconnect_backoff() }
}

// Spawn a tokio task to check the nonce of a transaction before sending
// it to the cachetask via the outbound channel.
/// Spawn a tokio task to check the nonce of a transaction before sending
/// it to the cachetask via the outbound channel.
fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<ReceivedTx>) {
tokio::spawn(async move {
let span = debug_span!("check_nonce", tx_id = %tx.tx_hash());
Expand Down Expand Up @@ -95,46 +104,132 @@ impl TxPoller {
});
}

/// Polls the transaction cache for transactions, paginating through all available pages.
pub async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> {
/// Pulls every transaction currently in the cache, paginating until the
/// stream is exhausted. Pure fetch — no metrics, no dispatch.
async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> {
self.tx_cache.stream_transactions().try_collect().await
}

async fn task_future(self, outbound: mpsc::UnboundedSender<ReceivedTx>) {
loop {
let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url);
/// Fetches all transactions from the cache and dispatches each one to
/// a nonce-check task. Records poll metrics around the fetch.
async fn fetch_and_dispatch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) {
let span = trace_span!("TxPoller::fetch_and_dispatch", url = %self.config.tx_pool_url);

crate::metrics::inc_tx_poll_count();
let Ok(transactions) = self
.check_tx_cache()
.inspect_err(|error| {
crate::metrics::inc_tx_poll_errors();
debug!(%error, "Error fetching transactions");
})
.instrument(span.clone())
.await
else {
return;
};

let _guard = span.entered();
crate::metrics::record_txs_fetched(transactions.len());
trace!(count = transactions.len(), "found transactions");
for tx in transactions {
self.spawn_check_nonce(tx, outbound.clone());
}
}

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
span.in_scope(|| trace!("No receivers left, shutting down"));
break;
}
/// Opens an SSE subscription to the transaction feed. Returns an empty
/// stream on connection failure so the caller can handle reconnection
/// uniformly.
async fn subscribe(&self) -> SseStream {
self.tx_cache
.subscribe_transactions()
.await
.inspect(|_| debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established"))
.inspect_err(|error| warn!(%error, "Failed to open SSE transaction subscription"))
.map(|s| Box::pin(s) as SseStream)
.unwrap_or_else(|_| Box::pin(futures_util::stream::empty()))
}

crate::metrics::inc_tx_poll_count();
if let Ok(transactions) = self
.check_tx_cache()
.inspect_err(|error| {
crate::metrics::inc_tx_poll_errors();
debug!(%error, "Error fetching transactions");
})
.instrument(span.clone())
.await
{
let _guard = span.entered();
crate::metrics::record_txs_fetched(transactions.len());
trace!(count = transactions.len(), "found transactions");
for tx in transactions.into_iter() {
self.spawn_check_nonce(tx, outbound.clone());
/// Reconnects the SSE stream with backoff. Performs a full refetch to
/// cover any items missed while disconnected.
async fn reconnect(&mut self, outbound: &mpsc::UnboundedSender<ReceivedTx>) -> SseStream {
crate::metrics::inc_sse_reconnect_attempts();
let delay = self.reconnect_backoff.next().expect("backoff is unbounded");
tokio::select! {
// Biased: a block env change wins over the backoff sleep. An env
// change triggers a full refetch below anyway, which supersedes the
// sleep-then-reconnect path — so there's no point waiting out the
// backoff.
biased;
_ = self.envs.changed() => {}
_ = time::sleep(delay) => {}
}
let (_, stream) = tokio::join!(self.fetch_and_dispatch(outbound), self.subscribe());
stream
}

/// Processes a single item yielded by the SSE stream: dispatches the tx
/// for nonce checking on success, or reconnects on error / stream end.
/// Returns `Break` when the outbound channel has closed and the task
/// should shut down.
async fn handle_sse_item(
&mut self,
item: Option<Result<TxEnvelope, TxCacheError>>,
outbound: &mpsc::UnboundedSender<ReceivedTx>,
stream: &mut SseStream,
) -> ControlFlow<()> {
match item {
Some(Ok(tx)) => {
if outbound.is_closed() {
trace!("No receivers left, shutting down");
return ControlFlow::Break(());
}
self.reconnect_backoff = reconnect_backoff();
self.spawn_check_nonce(tx, outbound.clone());
}
Some(Err(error)) => {
warn!(%error, "SSE transaction stream error, reconnecting");
*stream = self.reconnect(outbound).await;
}
None => {
warn!("SSE transaction stream ended, reconnecting");
*stream = self.reconnect(outbound).await;
}
}
ControlFlow::Continue(())
}

time::sleep(self.poll_duration()).await;
async fn task_future(mut self, outbound: mpsc::UnboundedSender<ReceivedTx>) {
// Initial full fetch of all currently-cached transactions, plus SSE
// subscription for real-time delivery.
let (_, mut sse_stream) =
tokio::join!(self.fetch_and_dispatch(&outbound), self.subscribe());

loop {
tokio::select! {
item = sse_stream.next() => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this arm can be collapsed a bit by adding stream combinators that transform the sse_stream into a future that only ever returns on ControlFlow::Break?

sketch:

let stream = sse_stream
   .then(|item| self.handle_sse_item(...))
   .skip_while(|flow| flow.is_continue)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing the &mut sse_stream as an arg break this tho

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but that's a bit of a code smell anyway, so it might be worth re-flowing to remove it by adding more information in the ControlFLow

if self
.handle_sse_item(item, &outbound, &mut sse_stream)
.await
.is_break()
{
break;
}
}
res = self.envs.changed() => {
if res.is_err() {
debug!("Block env channel closed, shutting down");
break;
}
debug!("Block env changed, refetching all transactions");
self.fetch_and_dispatch(&outbound).await;
}
}
}
}

/// Spawns a task that continuously polls the cache for transactions and sends any it finds to
/// its sender.
/// Spawns a task that fetches all current transactions, then subscribes
/// to the SSE feed for real-time updates, refetching on each new block
/// environment.
pub fn spawn(self) -> (mpsc::UnboundedReceiver<ReceivedTx>, JoinHandle<()>) {
let (outbound, inbound) = mpsc::unbounded_channel();
let jh = tokio::spawn(self.task_future(outbound));
Expand Down
13 changes: 5 additions & 8 deletions tests/tx_poller_test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![cfg(feature = "test-utils")]

use alloy::{primitives::U256, signers::local::PrivateKeySigner};
use builder::{
tasks::cache::TxPoller,
test_utils::{new_signed_tx, setup_logging, setup_test_config},
};
use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config};
use eyre::{Ok, Result};
use futures_util::TryStreamExt;
use signet_tx_cache::TxCache;

#[tokio::test]
async fn test_tx_roundtrip() -> Result<()> {
Expand All @@ -15,11 +14,9 @@ async fn test_tx_roundtrip() -> Result<()> {
// Post a transaction to the cache
post_tx().await?;

// Create a new poller
let poller = TxPoller::new();

// Fetch transactions from the pool
let transactions = poller.check_tx_cache().await?;
let tx_cache = TxCache::new(builder::config().tx_pool_url.clone());
let transactions: Vec<_> = tx_cache.stream_transactions().try_collect().await?;

// Ensure at least one transaction exists
assert!(!transactions.is_empty());
Expand Down