diff --git a/Dockerfile b/Dockerfile index 63c4bb6..dc4dedd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -66,7 +66,7 @@ useradd -r -g hyperliquid -u 10001 -d /home/hyperliquid -s /bin/bash hyperliquid # Create base directory structure install -d -m 755 -o root -g root /opt/hl -install -d -m 755 -o hyperliquid -g hyperliquid /home/hyperliquid /data /data/hl /data/hl/data /opt/hl/bin +install -d -m 755 -o hyperliquid -g hyperliquid /home/hyperliquid /data /data/hl /data/hl/data /data/snapshots /opt/hl/bin ln -s /data/hl /home/hyperliquid/hl EOF @@ -79,6 +79,7 @@ USER hyperliquid:hyperliquid VOLUME /opt/hl/bin VOLUME /data +VOLUME /data/snapshots WORKDIR /data # Import Hypqliquid public key. This is also required by hl-visor to verify downloaded binaries @@ -95,10 +96,17 @@ ENV HL_BOOTSTRAP_OVERRIDE_GOSSIP_CONFIG_MAX_AGE=15m ENV HL_BOOTSTRAP_SEED_PEERS_AMOUNT=5 ENV HL_BOOTSTRAP_SEED_PEERS_MAX_LATENCY=80ms ENV HL_BOOTSTRAP_NETWORK=Mainnet +ENV HL_BOOTSTRAP_METRICS_LISTEN_ADDRESS=0.0.0.0:2112 +ENV HL_BOOTSTRAP_SNAPSHOT_SERVER_LISTEN_ADDRESS=0.0.0.0:2113 # RPC EXPOSE 3001/tcp # P2P EXPOSE 4000-4010/tcp +# Metrics +EXPOSE 2112/tcp +# Snapshot server +EXPOSE 2113/tcp + ENTRYPOINT ["/usr/bin/catatonit", "--", "hl-bootstrap", "--override-gossip-config-path=/data/override_gossip_config.json", "--"] CMD ["run-non-validator", "--write-trades", "--write-fills", "--write-order-statuses", "--serve-eth-rpc", "--serve-info", "--disable-output-file-buffering"] diff --git a/compose.yaml b/compose.yaml index fda6c37..2d6b148 100644 --- a/compose.yaml +++ b/compose.yaml @@ -28,6 +28,7 @@ services: volumes: - "node-bin:/opt/hl/bin" - "node-data:/data" + - "node-snapshot:/data/snapshot" ports: - "127.0.0.1:3001:3001" - "0.0.0.0:4000-4010:4000-4010" @@ -41,3 +42,4 @@ services: volumes: node-bin: {} node-data: {} + node-snapshot: {} diff --git a/hl-bootstrap/Cargo.lock b/hl-bootstrap/Cargo.lock index 13009be..38a2691 100644 --- a/hl-bootstrap/Cargo.lock +++ b/hl-bootstrap/Cargo.lock @@ -90,6 +90,7 @@ checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ "axum-core", "bytes", + "form_urlencoded", "futures-util", "http", "http-body", @@ -104,6 +105,8 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", "tokio", "tower", @@ -315,6 +318,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + [[package]] name = "futures-task" version = "0.3.31" @@ -388,8 +397,10 @@ dependencies = [ "structstruck", "tempfile", "tokio", + "tokio-util", "tracing", "tracing-subscriber", + "uuid", "which", ] @@ -1157,18 +1168,28 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -1187,6 +1208,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1412,6 +1444,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.5.2" @@ -1559,6 +1604,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" +dependencies = [ + "getrandom 0.3.3", + "js-sys", + "serde_core", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" diff --git a/hl-bootstrap/Cargo.toml b/hl-bootstrap/Cargo.toml index d766b95..f78675c 100644 --- a/hl-bootstrap/Cargo.toml +++ b/hl-bootstrap/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" axum = { version = "0.8.4", default-features = false, features = [ "tokio", "http1", + "query", ] } clap = { version = "4.5.41", features = ["env", "derive"] } duration-string = "0.5.2" @@ -28,6 +29,8 @@ tokio = { version = "1.46.1", features = [ "rt", "rt-multi-thread", ] } +tokio-util = { version = "0.7.18", features = ["io"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +uuid = { version = "1.19.0", features = ["serde", "v7"] } which = { version = "8.0.0", features = ["tracing"] } diff --git a/hl-bootstrap/src/axum_ext.rs b/hl-bootstrap/src/axum_ext.rs new file mode 100644 index 0000000..60247c3 --- /dev/null +++ b/hl-bootstrap/src/axum_ext.rs @@ -0,0 +1,31 @@ +use axum::response::{IntoResponse, Response}; +use http::StatusCode; +use tracing::error; + +pub struct Report(eyre::Report); + +pub type HttpResult = eyre::Result; + +impl std::fmt::Debug for Report { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl From for Report +where + E: Into, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} + +impl IntoResponse for Report { + fn into_response(self) -> Response { + let err = self.0; + + error!(?err, "http handler error"); + (StatusCode::INTERNAL_SERVER_ERROR, format!("{err:?}")).into_response() + } +} diff --git a/hl-bootstrap/src/main.rs b/hl-bootstrap/src/main.rs index 6edcdd8..904a285 100644 --- a/hl-bootstrap/src/main.rs +++ b/hl-bootstrap/src/main.rs @@ -8,6 +8,7 @@ use std::{ os::unix::process::CommandExt, path::PathBuf, process::Command, + time::Duration, }; use clap::Parser; @@ -23,10 +24,12 @@ use tracing_subscriber::{ util::SubscriberInitExt, }; +mod axum_ext; mod hl_gossip_config; mod hl_visor; mod monitor; mod prune; +mod snapshot; mod speedtest; mod sysctl; @@ -134,10 +137,22 @@ struct Cli { #[arg(long, env = "HL_BOOTSTRAP_NETWORK", default_value_t = HyperliquidChain::Mainnet)] network: HyperliquidChain, + /// fileSnapshot request server listen address + #[arg(long, env = "HL_BOOTSTRAP_SNAPSHOT_SERVER_LISTEN_ADDRESS")] + snapshot_server_listen_address: Option, + /// Free form args to execute after the setup args: Vec, } +impl Cli { + fn should_keep_bootstrap_running(&self) -> bool { + self.prune_data_interval.is_some() + || self.metrics_listen_address.is_some() + || self.snapshot_server_listen_address.is_some() + } +} + fn main() -> eyre::Result<()> { let args = Cli::parse(); @@ -168,9 +183,7 @@ fn main() -> eyre::Result<()> { trace!(?args, "args"); - let use_mt = args.prune_data_interval.is_some() || args.metrics_listen_address.is_some(); - - let runtime = if use_mt { + let runtime = if args.should_keep_bootstrap_running() { Builder::new_multi_thread() } else { Builder::new_current_thread() @@ -192,7 +205,7 @@ fn main() -> eyre::Result<()> { fn run_node(rt: Runtime, args: &Cli) -> eyre::Result<()> { info!(args = ?args.args, "setup done, executing hl-visor"); - if args.prune_data_interval.is_none() && args.metrics_listen_address.is_none() { + if !args.should_keep_bootstrap_running() { drop(rt); // Just exec into the child @@ -205,11 +218,12 @@ fn run_node(rt: Runtime, args: &Cli) -> eyre::Result<()> { let data_directory = current_dir().wrap_err("failed to get current working directory")?; let _prune_task = args.prune_data_interval.map(|prune_interval| { + let hl_data_directory = data_directory.clone().join("hl/data"); rt.spawn({ let prune_data_older_than = args.prune_data_older_than; prune_worker_task( - data_directory, + hl_data_directory, prune_interval.into(), prune_data_older_than.into(), ) @@ -235,6 +249,29 @@ fn run_node(rt: Runtime, args: &Cli) -> eyre::Result<()> { }) }); + let _snapshot_server = args.snapshot_server_listen_address.map(|address| { + let snapshot_directory = data_directory.clone().join("snapshots"); + rt.spawn(async move { + info!(%address, "starting snapshot server"); + if let Err(err) = + crate::snapshot::server::run_snapshot_server(snapshot_directory, address).await + { + error!(?err, "failed to start snapshot server") + } + }) + }); + + let _snapshot_prune_task = args.snapshot_server_listen_address.is_some().then(|| { + let snapshot_directory = data_directory.join("snapshots"); + rt.spawn(async move { + // Snapshots are rather big, prune them more often + let prune_interval = Duration::from_secs(30); + let prune_data_older_than = Duration::from_secs(15); + + prune_worker_task(snapshot_directory, prune_interval, prune_data_older_than) + }); + }); + let mut child = Command::new("hl-visor") .args(&args.args) .spawn() diff --git a/hl-bootstrap/src/prune.rs b/hl-bootstrap/src/prune.rs index 74f590a..a6dc142 100644 --- a/hl-bootstrap/src/prune.rs +++ b/hl-bootstrap/src/prune.rs @@ -11,7 +11,7 @@ pub async fn prune_worker_task>( prune_interval: Duration, prune_older_than: Duration, ) { - let base_path = base_path.as_ref().join("hl/data"); + let base_path = base_path.as_ref(); let mut interval = interval(prune_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); diff --git a/hl-bootstrap/src/snapshot/mod.rs b/hl-bootstrap/src/snapshot/mod.rs new file mode 100644 index 0000000..259fd85 --- /dev/null +++ b/hl-bootstrap/src/snapshot/mod.rs @@ -0,0 +1,56 @@ +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; +use serde_json::json; +use uuid::Uuid; + +pub mod server; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "type")] +pub enum FileSnapshotType { + #[serde(rename = "l4Snapshots")] + L4Snapshots { + #[serde(rename = "includeUsers", default)] + include_users: bool, + + #[serde(rename = "includeTriggerOrders", default)] + include_trigger_orders: bool, + }, + #[serde(rename = "referrerStates")] + ReferrerStates, +} + +impl FileSnapshotType { + pub fn type_name(&self) -> &'static str { + match self { + Self::L4Snapshots { .. } => "l4Snapshots", + Self::ReferrerStates => "referrerStates", + } + } +} + +pub fn create_file_snapshot_path( + base: impl AsRef, + snapshot_request: &FileSnapshotType, +) -> PathBuf { + let snapshot_id = Uuid::now_v7(); + base.as_ref().join(format!( + "{}_{}.json", + snapshot_request.type_name(), + snapshot_id + )) +} + +pub fn create_file_snapshot_payload( + snapshot_request: &FileSnapshotType, + include_height_in_output: bool, + out_path: impl AsRef, +) -> serde_json::Value { + json!({ + "type": "fileSnapshot", + "request": snapshot_request, + "includeHeightInOutput": include_height_in_output, + "outPath": out_path.as_ref().to_string_lossy(), + }) +} diff --git a/hl-bootstrap/src/snapshot/server.rs b/hl-bootstrap/src/snapshot/server.rs new file mode 100644 index 0000000..81c84ce --- /dev/null +++ b/hl-bootstrap/src/snapshot/server.rs @@ -0,0 +1,92 @@ +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; +use std::sync::LazyLock; +use std::time::Duration; + +use axum::body::Body; +use axum::extract::Query; +use axum::http::header::CONTENT_TYPE; +use axum::response::IntoResponse; +use axum::routing::get; +use axum::{Router, extract::State}; +use reqwest::{Client, ClientBuilder, StatusCode}; +use serde::Deserialize; +use tokio::fs::File; +use tokio::net::TcpListener; +use tokio_util::io::ReaderStream; + +use crate::axum_ext::HttpResult; + +static CLIENT: LazyLock = LazyLock::new(|| ClientBuilder::new().build().unwrap()); + +#[derive(Clone)] +struct SnapshotServer { + snapshot_directory: PathBuf, +} + +fn router() -> Router { + Router::new().route("/snapshot", get(snapshot)) +} + +#[derive(Debug, Deserialize)] +struct SnapshotRequest { + #[serde(flatten)] + snapshot: super::FileSnapshotType, + + #[serde( + rename = "includeHeightInOutput", + default = "default_include_height_in_output" + )] + include_height_in_output: bool, +} + +fn default_include_height_in_output() -> bool { + true +} + +async fn snapshot( + State(state): State, + Query(SnapshotRequest { + snapshot, + include_height_in_output, + }): Query, +) -> HttpResult { + let snapshot_path = super::create_file_snapshot_path(&state.snapshot_directory, &snapshot); + let payload = + super::create_file_snapshot_payload(&snapshot, include_height_in_output, &snapshot_path); + + let _response = CLIENT + .post("http://127.0.0.1:3001/info") + .json(&payload) + .send() + .await? + .error_for_status()?; + + // TODO: assert response + + // TODO: unsure if this is needed + tokio::time::sleep(Duration::from_millis(20)).await; + + let stream = ReaderStream::new(File::open(snapshot_path).await?); + + Ok(( + StatusCode::OK, + [(CONTENT_TYPE, "application/json")], + Body::from_stream(stream), + ) + .into_response()) +} + +pub async fn run_snapshot_server( + snapshot_directory: impl AsRef, + listen_address: SocketAddr, +) -> eyre::Result<()> { + let state = SnapshotServer { + snapshot_directory: snapshot_directory.as_ref().into(), + }; + + let listener = TcpListener::bind(listen_address).await?; + axum::serve(listener, router().with_state(state).into_make_service()).await?; + + Ok(()) +}