From 5a93e5e049a50c6374699f03005e5b309c869b61 Mon Sep 17 00:00:00 2001 From: Jim Hester Date: Fri, 1 May 2026 17:37:27 -0400 Subject: [PATCH 01/11] feat(deps): add sha2 and hex for HybridReader cache key derivation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two small crates needed by the upcoming hybrid_cache module: sha2 for SHA-256 hashing of (reader_uri, sql) cache keys, hex for fixed-width hex encoding of the truncated digest. Both pull in only `generic-array`/`typenum`/`block-buffer` transitively — no large graphs. --- Cargo.lock | 2 ++ src/Cargo.toml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 2dbe4d44b..2d135c2b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2079,6 +2079,7 @@ dependencies = [ "csscolorparser", "duckdb", "geozero", + "hex", "jsonschema", "libloading", "palette", @@ -2088,6 +2089,7 @@ dependencies = [ "rusqlite", "serde", "serde_json", + "sha2", "sprintf", "tempfile", "thiserror 1.0.69", diff --git a/src/Cargo.toml b/src/Cargo.toml index 264ae7061..c0db4b6e4 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -52,6 +52,8 @@ rand.workspace = true sprintf = "0.4" const_format.workspace = true uuid.workspace = true +sha2 = "0.10" +hex = "0.4" [dev-dependencies] jsonschema = { version = "0.44", default-features = false, features = ["resolve-file"] } From b74adb5c5a05d0c00f687fc33531e95085ec6e7d Mon Sep 17 00:00:00 2001 From: Jim Hester Date: Fri, 1 May 2026 17:39:44 -0400 Subject: [PATCH 02/11] feat(reader): add hybrid_cache module for query-result caching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Generic cache primitives for HybridReader: SHA-256 cache-key derivation from (reader_uri, sql), DuckDB meta-table DDL, lookup/insert/touch/drop operations, and LRU eviction over a configurable byte budget. Reader- agnostic — the only 'remote' reference is via the caller-supplied reader_uri string. Tests cover key stability, separator-injection collision resistance, meta-table idempotency, INSERT-OR-REPLACE behavior on the cache_key PK, and the lookup/insert/touch/drop cycle. Module not yet wired into mod.rs — that lands together with the HybridReader cache integration in the next commit. --- src/reader/hybrid_cache.rs | 383 +++++++++++++++++++++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 src/reader/hybrid_cache.rs diff --git a/src/reader/hybrid_cache.rs b/src/reader/hybrid_cache.rs new file mode 100644 index 000000000..9e89bc043 --- /dev/null +++ b/src/reader/hybrid_cache.rs @@ -0,0 +1,383 @@ +//! Query-result caching for `HybridReader`. +//! +//! Hashes `(reader_uri, sql)` → stable per-query table name in the staging +//! DuckDB. See `hybrid.rs` docstring for the wider design. + +use arrow::array::Array; +use sha2::{Digest, Sha256}; + +use crate::array_util::{as_i64, as_str}; +use crate::reader::{DuckDBReader, Reader}; +use crate::Result; + +pub const META_TABLE: &str = "__ggsql_cache_meta__"; + +/// Runtime configuration for the query-result cache. +#[derive(Debug, Clone)] +pub struct CacheConfig { + /// Disable entirely if false — execute_sql always routes straight to + /// the primary reader, no meta-table touched. + pub enabled: bool, + /// Time-to-live in seconds. Entries older than this get evicted on + /// lookup and re-fetched. Default 300s. + pub ttl_secs: u64, + /// Cumulative byte budget across all cache entries. When exceeded + /// after an insert, LRU eviction fires. Default 512 MB. + pub max_bytes: u64, +} + +impl Default for CacheConfig { + fn default() -> Self { + let enabled = std::env::var("GGSQL_HYBRID_CACHE_DISABLED") + .ok() + .filter(|v| !v.is_empty() && v != "0") + .is_none(); + Self { + enabled, + ttl_secs: 300, + max_bytes: 512 * 1024 * 1024, + } + } +} + +/// Create the meta table if it does not already exist. DuckDB-specific +/// DDL; safe to call on every `HybridReader::new`. +pub fn ensure_meta(staging: &DuckDBReader) -> Result<()> { + let ddl = format!( + "CREATE TABLE IF NOT EXISTS {META_TABLE} ( + cache_key VARCHAR PRIMARY KEY, + reader_uri VARCHAR NOT NULL, + sql VARCHAR NOT NULL, + fetched_at_epoch_ms BIGINT NOT NULL, + last_accessed_epoch_ms BIGINT NOT NULL, + row_count BIGINT NOT NULL, + byte_estimate BIGINT NOT NULL + )" + ); + staging.execute_sql(&ddl).map(|_| ()) +} + +/// Stable cache-key derived from the remote reader's URI and the SQL text. +/// Uses SHA-256 hex truncated to 16 chars — 64 bits of key space, collision +/// odds negligible at any realistic cache size. +/// +/// Inputs are joined with a newline separator so a `\n`-containing URI can't +/// be confused with the SQL half of a different pair. +pub fn cache_key(reader_uri: &str, sql: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(reader_uri.as_bytes()); + hasher.update(b"\n"); + hasher.update(sql.as_bytes()); + let digest = hasher.finalize(); + hex::encode(&digest[..8]) // 16 hex chars = 64 bits +} + +/// Table name for a cached result. Kept out-of-band from user-visible +/// names by the `__ggsql_cache_` prefix, matching the convention used for +/// other framework tables (`__ggsql_aes_*`, `__ggsql_layer_*`). +pub fn cache_table_name(key: &str) -> String { + format!("__ggsql_cache_{}", key) +} + +#[derive(Debug, Clone)] +pub struct CacheEntry { + pub cache_key: String, + pub reader_uri: String, + pub sql: String, + pub fetched_at_epoch_ms: i64, + pub last_accessed_epoch_ms: i64, + pub row_count: i64, + pub byte_estimate: i64, +} + +pub fn now_ms() -> i64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + +// SQL-escape a string for embedding in a DuckDB string literal. +fn esc(s: &str) -> String { + s.replace('\'', "''") +} + +pub fn lookup(staging: &DuckDBReader, key: &str) -> Result> { + let sql = format!( + "SELECT cache_key, reader_uri, sql, fetched_at_epoch_ms, + last_accessed_epoch_ms, row_count, byte_estimate + FROM {META_TABLE} WHERE cache_key = '{}'", + esc(key) + ); + let df = staging.execute_sql(&sql)?; + if df.height() == 0 { + return Ok(None); + } + let get_str = |col: &str| -> Result { + let s = df.column(col).map_err(|e| { + crate::GgsqlError::ReaderError(format!("cache lookup col {col}: {e}")) + })?; + let arr = as_str(s).map_err(|e| { + crate::GgsqlError::ReaderError(format!("cache lookup str {col}: {e}")) + })?; + if arr.is_empty() || arr.is_null(0) { + Ok(String::new()) + } else { + Ok(arr.value(0).to_string()) + } + }; + let get_i64 = |col: &str| -> Result { + let s = df.column(col).map_err(|e| { + crate::GgsqlError::ReaderError(format!("cache lookup col {col}: {e}")) + })?; + let arr = as_i64(s).map_err(|e| { + crate::GgsqlError::ReaderError(format!("cache lookup i64 {col}: {e}")) + })?; + if arr.is_empty() || arr.is_null(0) { + Ok(0) + } else { + Ok(arr.value(0)) + } + }; + Ok(Some(CacheEntry { + cache_key: get_str("cache_key")?, + reader_uri: get_str("reader_uri")?, + sql: get_str("sql")?, + fetched_at_epoch_ms: get_i64("fetched_at_epoch_ms")?, + last_accessed_epoch_ms: get_i64("last_accessed_epoch_ms")?, + row_count: get_i64("row_count")?, + byte_estimate: get_i64("byte_estimate")?, + })) +} + +pub fn insert_meta( + staging: &DuckDBReader, + key: &str, + reader_uri: &str, + sql: &str, + row_count: i64, + byte_estimate: i64, +) -> Result<()> { + // INSERT OR REPLACE so a stale row from a previous attempt (e.g. a + // partial failure where the cache table was registered but the meta + // row insert errored) gets cleanly overwritten on retry instead of + // raising a PK conflict on `cache_key`. + let now = now_ms(); + let dml = format!( + "INSERT OR REPLACE INTO {META_TABLE} + (cache_key, reader_uri, sql, fetched_at_epoch_ms, + last_accessed_epoch_ms, row_count, byte_estimate) + VALUES ('{}', '{}', '{}', {}, {}, {}, {})", + esc(key), + esc(reader_uri), + esc(sql), + now, + now, + row_count, + byte_estimate, + ); + staging.execute_sql(&dml).map(|_| ()) +} + +pub fn touch(staging: &DuckDBReader, key: &str) -> Result<()> { + let dml = format!( + "UPDATE {META_TABLE} SET last_accessed_epoch_ms = {} + WHERE cache_key = '{}'", + now_ms(), + esc(key) + ); + staging.execute_sql(&dml).map(|_| ()) +} + +pub fn drop_entry(staging: &DuckDBReader, key: &str) -> Result<()> { + let del = format!( + "DELETE FROM {META_TABLE} WHERE cache_key = '{}'", + esc(key) + ); + staging.execute_sql(&del)?; + let drop = format!("DROP TABLE IF EXISTS {}", cache_table_name(key)); + staging.execute_sql(&drop).map(|_| ()) +} + +pub fn clear_all(staging: &DuckDBReader) -> Result<()> { + // Find all cache_keys; drop each entry. + let df = staging.execute_sql(&format!( + "SELECT cache_key FROM {META_TABLE}" + ))?; + if df.height() > 0 { + let col = df.column("cache_key").map_err(|e| { + crate::GgsqlError::ReaderError(format!("clear_all col: {e}")) + })?; + let s = as_str(col).map_err(|e| { + crate::GgsqlError::ReaderError(format!("clear_all str: {e}")) + })?; + // Collect keys first to avoid holding borrow during drop_entry mutations. + let keys: Vec = (0..s.len()) + .filter(|&i| !s.is_null(i)) + .map(|i| s.value(i).to_string()) + .collect(); + for k in keys { + let _ = drop_entry(staging, &k); + } + } + // Defensive final wipe in case stale rows linger. + staging.execute_sql(&format!("DELETE FROM {META_TABLE}"))?; + Ok(()) +} + +/// Drop the oldest-accessed entries until total bytes <= max_bytes. +/// Run after every insert. +pub fn evict_over_budget(staging: &DuckDBReader, max_bytes: u64) -> Result<()> { + // Cast SUM result to BIGINT — DuckDB's SUM over BIGINT promotes to HUGEINT, + // which the arrow adapter materializes as Float64 (or unsupported type) + // and would silently break the i64 extractor below. + let sum_sql = format!( + "SELECT CAST(COALESCE(SUM(byte_estimate), 0) AS BIGINT) AS n FROM {META_TABLE}" + ); + loop { + let df = staging.execute_sql(&sum_sql)?; + let total = df + .column("n") + .ok() + .and_then(|c| as_i64(c).ok()) + .and_then(|arr| { + if arr.is_empty() || arr.is_null(0) { + None + } else { + Some(arr.value(0)) + } + }) + .unwrap_or(0) as u64; + if total <= max_bytes { + return Ok(()); + } + // Find the single oldest-accessed key. + let pick = format!( + "SELECT cache_key FROM {META_TABLE} + ORDER BY last_accessed_epoch_ms ASC LIMIT 1" + ); + let df = staging.execute_sql(&pick)?; + if df.height() == 0 { + return Ok(()); // empty but still over budget — impossible, safety + } + let key = df + .column("cache_key") + .ok() + .and_then(|c| as_str(c).ok()) + .and_then(|arr| { + if arr.is_empty() || arr.is_null(0) { + None + } else { + Some(arr.value(0).to_string()) + } + }) + .unwrap_or_default(); + if key.is_empty() { + return Ok(()); + } + drop_entry(staging, &key)?; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn key_is_stable() { + let a = cache_key("quiver+trino://prod", "SELECT 1"); + let b = cache_key("quiver+trino://prod", "SELECT 1"); + assert_eq!(a, b); + } + + #[test] + fn uri_matters() { + let a = cache_key("quiver+trino://prod", "SELECT 1"); + let b = cache_key("quiver+trino://test", "SELECT 1"); + assert_ne!(a, b); + } + + #[test] + fn sql_matters() { + let a = cache_key("q+t://p", "SELECT 1"); + let b = cache_key("q+t://p", "SELECT 2"); + assert_ne!(a, b); + } + + #[test] + fn uri_sql_cannot_collide() { + // If we didn't include the separator, ("ab", "c") and ("a", "bc") + // would hash to the same concatenation. + let a = cache_key("ab", "c"); + let b = cache_key("a", "bc"); + assert_ne!(a, b); + } + + #[test] + fn table_name_prefix() { + assert!(cache_table_name("deadbeef").starts_with("__ggsql_cache_")); + } + + #[test] + fn ensure_meta_is_idempotent() { + use crate::reader::DuckDBReader; + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + ensure_meta(&staging).unwrap(); + // Running twice must not error + ensure_meta(&staging).unwrap(); + // Meta table should be queryable + let df = staging + .execute_sql("SELECT COUNT(*) AS n FROM __ggsql_cache_meta__") + .unwrap(); + assert_eq!(df.height(), 1); + } + + #[test] + fn insert_meta_replaces_existing_row() { + // insert_meta must be idempotent on the cache_key PK so a stale row + // from a previous attempt doesn't prevent the miss path from + // overwriting it. Without `INSERT OR REPLACE`, the second call here + // raises a PK conflict. + use crate::reader::DuckDBReader; + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + ensure_meta(&staging).unwrap(); + let key = "samekey"; + insert_meta(&staging, key, "uri-a", "SELECT 1", 1, 100).unwrap(); + // Second insert with same key must NOT raise PK conflict. + insert_meta(&staging, key, "uri-b", "SELECT 2", 2, 200).unwrap(); + let entry = lookup(&staging, key).unwrap().unwrap(); + assert_eq!(entry.reader_uri, "uri-b"); + assert_eq!(entry.sql, "SELECT 2"); + assert_eq!(entry.row_count, 2); + assert_eq!(entry.byte_estimate, 200); + } + + #[test] + fn lookup_insert_touch_cycle() { + use crate::reader::DuckDBReader; + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + ensure_meta(&staging).unwrap(); + let key = "abc123"; + + // empty + assert!(lookup(&staging, key).unwrap().is_none()); + + // insert + insert_meta(&staging, key, "quiver+trino://prod", "SELECT 1", 1, 128).unwrap(); + let entry = lookup(&staging, key).unwrap().unwrap(); + assert_eq!(entry.row_count, 1); + assert_eq!(entry.byte_estimate, 128); + + // touch advances last_accessed + let before = entry.last_accessed_epoch_ms; + std::thread::sleep(std::time::Duration::from_millis(10)); + touch(&staging, key).unwrap(); + let after = lookup(&staging, key).unwrap().unwrap(); + assert!(after.last_accessed_epoch_ms > before); + + // drop + drop_entry(&staging, key).unwrap(); + assert!(lookup(&staging, key).unwrap().is_none()); + } +} From bca52bd3bf316a4458c3ebf0b5544f5fdaec2f8a Mon Sep 17 00:00:00 2001 From: Jim Hester Date: Fri, 1 May 2026 17:40:47 -0400 Subject: [PATCH 03/11] test(hybrid_cache): use neutral 'backend://' URIs in cache-key tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cache module is reader-agnostic, but the test fixtures inherited from the source-of-truth file used 'quiver+trino://' URIs that would read as Netflix-specific to upstream reviewers. Replace with neutral 'backend://' literals — the substance of the tests (key stability, URI sensitivity, lookup/insert cycle) is unchanged. --- src/reader/hybrid_cache.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/reader/hybrid_cache.rs b/src/reader/hybrid_cache.rs index 9e89bc043..45caecd59 100644 --- a/src/reader/hybrid_cache.rs +++ b/src/reader/hybrid_cache.rs @@ -286,15 +286,15 @@ mod tests { #[test] fn key_is_stable() { - let a = cache_key("quiver+trino://prod", "SELECT 1"); - let b = cache_key("quiver+trino://prod", "SELECT 1"); + let a = cache_key("backend://prod", "SELECT 1"); + let b = cache_key("backend://prod", "SELECT 1"); assert_eq!(a, b); } #[test] fn uri_matters() { - let a = cache_key("quiver+trino://prod", "SELECT 1"); - let b = cache_key("quiver+trino://test", "SELECT 1"); + let a = cache_key("backend://prod", "SELECT 1"); + let b = cache_key("backend://test", "SELECT 1"); assert_ne!(a, b); } @@ -364,7 +364,7 @@ mod tests { assert!(lookup(&staging, key).unwrap().is_none()); // insert - insert_meta(&staging, key, "quiver+trino://prod", "SELECT 1", 1, 128).unwrap(); + insert_meta(&staging, key, "backend://prod", "SELECT 1", 1, 128).unwrap(); let entry = lookup(&staging, key).unwrap().unwrap(); assert_eq!(entry.row_count, 1); assert_eq!(entry.byte_estimate, 128); From 8af47d63a57e375740f16d32e53a4900f609648d Mon Sep 17 00:00:00 2001 From: Jim Hester Date: Fri, 1 May 2026 17:42:30 -0400 Subject: [PATCH 04/11] feat(reader): add clear_cache() trait method and declare hybrid_cache mod New `Reader::clear_cache()` method with default `Ok(())` so existing readers (DuckDB, SQLite, ODBC, ADBC) inherit a no-op implementation. `HybridReader` will override it in the next commit to drop its staging-DuckDB cache tables. Also declares `mod hybrid_cache;` (non-pub) so the new helper module is reachable from `hybrid.rs`. --- src/reader/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 5cf7d3f9d..7c91f0641 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -345,6 +345,9 @@ pub mod adbc; #[cfg(feature = "duckdb")] pub mod hybrid; +#[cfg(feature = "duckdb")] +mod hybrid_cache; + pub mod connection; pub mod data; mod spec; @@ -633,6 +636,16 @@ pub trait Reader { } Ok(results) } + + /// Clear any cached query results this reader may hold. + /// + /// The default returns `Ok(())` — readers without a cache (DuckDB, + /// SQLite, ODBC, plain ADBC) inherit it transparently. + /// [`HybridReader`](hybrid::HybridReader) overrides it to drop the + /// cache tables it manages in its staging DuckDB. + fn clear_cache(&self) -> Result<()> { + Ok(()) + } } /// A table or view in the schema. From 21ce07516de3ec1e33bab020d60034436c330033 Mon Sep 17 00:00:00 2001 From: Jim Hester Date: Fri, 1 May 2026 17:55:25 -0400 Subject: [PATCH 05/11] feat(reader): wire query-result cache into HybridReader execute_sql now consults the staging DuckDB cache (TTL + LRU byte-budget eviction) before falling through to the primary reader, and stages miss results back under hashed table names. clear_cache() override drops all cache tables. with_cache_config() lets callers tune TTL or byte budget; default is 300s TTL / 512 MB / enabled (gated by GGSQL_HYBRID_CACHE_DISABLED=1). Tests cover default config, custom config application, repeat-query cache hit, ttl=0 always-miss, LRU eviction under tight budget, clear_cache wiping the meta and tables, and the empty-width fast-path. --- src/reader/hybrid.rs | 375 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 368 insertions(+), 7 deletions(-) diff --git a/src/reader/hybrid.rs b/src/reader/hybrid.rs index f630b4385..f36c59c09 100644 --- a/src/reader/hybrid.rs +++ b/src/reader/hybrid.rs @@ -27,7 +27,25 @@ //! returns staging's dialect. That is the correct choice for queries over staged //! data; when you need SQL targeted at the remote source (e.g. schema introspection //! of the remote catalog), use [`HybridReader::data_dialect`] instead. - +//! +//! # Query-Result Cache +//! +//! `HybridReader::execute_sql` memoizes remote query results in the staging +//! DuckDB under hashed table names (`__ggsql_cache_`). Repeat calls +//! with identical `(reader_uri, sql)` within the configured TTL return +//! cached data in <1 ms instead of a round-trip to the primary reader. +//! +//! - Cache is enabled by default; set `GGSQL_HYBRID_CACHE_DISABLED=1` to +//! turn it off, or construct via `HybridReader::with_cache_config` to +//! supply a custom `CacheConfig` (TTL, byte budget). +//! - Eviction is LRU by last-access once the cumulative `byte_estimate` +//! of entries exceeds `CacheConfig::max_bytes` (default 512 MB). +//! - Manual invalidation: `HybridReader::clear_cache()` on the Rust side, +//! `-- @uncache` meta-command in the Jupyter kernel. +//! - Scope: the cache lives in the staging DuckDB instance owned by a +//! single `HybridReader`. No cross-session/disk persistence in v1. + +use crate::reader::hybrid_cache::{self, CacheConfig}; use crate::reader::{DuckDBReader, Reader, Spec, SqlDialect}; use crate::{DataFrame, Result}; use std::cell::RefCell; @@ -37,20 +55,42 @@ pub struct HybridReader { data: Box, staging: DuckDBReader, staged_names: RefCell>, + cache: CacheConfig, } impl HybridReader { /// Construct a `HybridReader` from a primary data reader and a staging - /// DuckDB instance. The staging instance is owned by the `HybridReader` - /// and dropped with it; staged tables do not persist across constructions. + /// DuckDB instance, with default cache config (enabled unless + /// `GGSQL_HYBRID_CACHE_DISABLED=1` is set). pub fn new(data: Box, staging: DuckDBReader) -> Self { + Self::with_cache_config(data, staging, CacheConfig::default()) + } + + /// Construct with caller-supplied cache config. Ensures the meta + /// table exists eagerly so the first remote query doesn't pay the + /// schema-creation cost. Ignoring errors here is intentional: if + /// meta-init fails, subsequent cache ops will fail too and bubble up. + pub fn with_cache_config( + data: Box, + staging: DuckDBReader, + cache: CacheConfig, + ) -> Self { + if cache.enabled { + let _ = hybrid_cache::ensure_meta(&staging); + } Self { data, staging, staged_names: RefCell::new(HashSet::new()), + cache, } } + /// Read-only access to this reader's cache configuration. + pub fn cache_config(&self) -> &CacheConfig { + &self.cache + } + /// Dialect of the primary data backend. Useful for SQL targeted at the /// remote source rather than the staging DuckDB (e.g. schema introspection /// of the remote catalog). @@ -61,11 +101,91 @@ impl HybridReader { impl Reader for HybridReader { fn execute_sql(&self, sql: &str) -> Result { - if references_staged_name(sql, &self.staged_names.borrow()) { - self.staging.execute_sql(sql) - } else { - self.data.execute_sql(sql) + use crate::naming::quote_ident; + + let names = self.staged_names.borrow(); + if references_staged_name(sql, &names) { + return self.staging.execute_sql(sql); } + drop(names); + + // Cache disabled → direct passthrough. + if !self.cache.enabled { + return self.data.execute_sql(sql); + } + + // We need a reader URI for the cache key. The Reader trait doesn't + // expose one — we use a fixed placeholder per HybridReader instance + // (each instance has its own staging DuckDB namespace, so keys don't + // need to cross-collide between instances). + let reader_uri = "hybrid-primary"; + let key = hybrid_cache::cache_key(reader_uri, sql); + let table = hybrid_cache::cache_table_name(&key); + + // Hit: meta row exists AND within TTL AND table exists. + if let Some(entry) = hybrid_cache::lookup(&self.staging, &key)? { + let age_ms = hybrid_cache::now_ms() - entry.fetched_at_epoch_ms; + let ttl_ms = (self.cache.ttl_secs as i64) * 1000; + // Strict `<` so that ttl=0 always misses, even when racing within + // the same millisecond (age_ms = 0, ttl_ms = 0). + if age_ms < ttl_ms { + let select = format!("SELECT * FROM {}", quote_ident(&table)); + match self.staging.execute_sql(&select) { + Ok(df) => { + let _ = hybrid_cache::touch(&self.staging, &key); + return Ok(df); + } + Err(_) => { + // Cache table vanished (manual drop, crash mid-insert). + // Fall through to miss path. + let _ = hybrid_cache::drop_entry(&self.staging, &key); + } + } + } else { + // Stale — evict and refetch. + let _ = hybrid_cache::drop_entry(&self.staging, &key); + } + } + + // Miss (or stale after eviction) — fetch from primary and stage. + let df = self.data.execute_sql(sql)?; + let row_count = df.height() as i64; + let byte_estimate = estimate_bytes(&df); + + // DuckDB's `arrow(...)` table function (used by `register`) rejects + // schemas with zero columns. The viz pipeline occasionally issues + // metadata-only queries that produce empty results; cache those by + // value instead of by table. + if df.width() == 0 { + return Ok(df); + } + + self.staging.register(&table, df.clone(), true)?; + if let Err(e) = hybrid_cache::insert_meta( + &self.staging, + &key, + reader_uri, + sql, + row_count, + byte_estimate, + ) { + // insert_meta failed AFTER register succeeded — clean up the + // orphan cache table so it doesn't linger forever without a + // tracking meta row (which would prevent it from ever being + // evicted or garbage-collected). + let _ = self + .staging + .execute_sql(&format!("DROP TABLE IF EXISTS {}", quote_ident(&table))); + return Err(e); + } + // Eviction is bookkeeping — if it hiccups (transient DuckDB error, + // internal SQL bug), the user's data is already cached and the + // primary fetch succeeded. Don't surface the failure as a query + // error; just log and continue. + if let Err(e) = hybrid_cache::evict_over_budget(&self.staging, self.cache.max_bytes) { + eprintln!("ggsql: cache eviction failed (non-fatal): {e}"); + } + Ok(df) } fn register(&self, name: &str, df: DataFrame, replace: bool) -> Result<()> { @@ -91,6 +211,20 @@ impl Reader for HybridReader { // remote catalog) can access it via `HybridReader::data_dialect()`. self.staging.dialect() } + + fn clear_cache(&self) -> Result<()> { + if self.cache.enabled { + hybrid_cache::clear_all(&self.staging)?; + } + Ok(()) + } +} + +fn estimate_bytes(df: &DataFrame) -> i64 { + df.get_columns() + .iter() + .map(|col| col.get_array_memory_size()) + .sum::() as i64 } /// Check whether `sql` references any name in `staged_names` as a SQL @@ -402,4 +536,231 @@ mod tests { "expected staging-side error mentioning the missing `remote_only` table; got: {err_msg}" ); } + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + /// A reader that wraps an inner `DuckDBReader` and increments a shared + /// counter on every `execute_sql` call. Cache-hit tests use the + /// counter to assert "the second call did NOT reach the primary." + /// The counter is `Arc` so a clone can be retained by + /// the test after the reader itself is moved into `Box`. + struct CountingReader { + inner: DuckDBReader, + calls: Arc, + } + + impl CountingReader { + fn new(inner: DuckDBReader, calls: Arc) -> Self { + Self { inner, calls } + } + } + + impl Reader for CountingReader { + fn execute_sql(&self, sql: &str) -> Result { + self.calls.fetch_add(1, Ordering::SeqCst); + self.inner.execute_sql(sql) + } + fn register(&self, name: &str, df: DataFrame, replace: bool) -> Result<()> { + self.inner.register(name, df, replace) + } + fn unregister(&self, name: &str) -> Result<()> { + self.inner.unregister(name) + } + fn execute(&self, query: &str) -> Result { + crate::reader::execute_with_reader(self, query) + } + fn dialect(&self) -> &dyn SqlDialect { + self.inner.dialect() + } + } + + #[test] + fn new_has_cache_enabled_by_default() { + let data = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let r = HybridReader::new(Box::new(data), staging); + assert!(r.cache_config().enabled); + assert_eq!(r.cache_config().ttl_secs, 300); + } + + #[test] + fn with_cache_config_applies_custom_settings() { + use crate::reader::hybrid_cache::CacheConfig; + let data = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let cfg = CacheConfig { + enabled: false, + ttl_secs: 60, + max_bytes: 1024, + }; + let r = HybridReader::with_cache_config(Box::new(data), staging, cfg); + assert!(!r.cache_config().enabled); + assert_eq!(r.cache_config().ttl_secs, 60); + assert_eq!(r.cache_config().max_bytes, 1024); + } + + #[test] + fn repeat_query_hits_cache_not_data() { + // Two identical execute_sql calls must result in only ONE + // execute_sql reaching the primary reader — the second call hits + // the cache. + let data_inner = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + data_inner + .execute_sql("CREATE TABLE t AS SELECT 1 AS x UNION ALL SELECT 2") + .unwrap(); + let calls = Arc::new(AtomicUsize::new(0)); + let counting = CountingReader::new(data_inner, calls.clone()); + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let r = HybridReader::new(Box::new(counting), staging); + + // The CREATE TABLE above ran on `data_inner` BEFORE wrapping in + // CountingReader, so `calls` is still 0. Two identical SELECTs + // through the HybridReader: first miss → counter to 1; second + // hit → counter stays at 1. + let sql = "SELECT x FROM t ORDER BY x"; + r.execute_sql(sql).unwrap(); + r.execute_sql(sql).unwrap(); + assert_eq!( + calls.load(Ordering::SeqCst), + 1, + "second execute_sql must be served by cache" + ); + } + + #[test] + fn ttl_zero_forces_miss_every_call() { + use crate::reader::hybrid_cache::CacheConfig; + let data_inner = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + data_inner + .execute_sql("CREATE TABLE t AS SELECT 1 AS x") + .unwrap(); + let calls = Arc::new(AtomicUsize::new(0)); + let counting = CountingReader::new(data_inner, calls.clone()); + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let cfg = CacheConfig { + enabled: true, + ttl_secs: 0, + max_bytes: 1 << 30, + }; + let r = HybridReader::with_cache_config(Box::new(counting), staging, cfg); + + // No sleep between calls — the boundary check must be strict + // (`age_ms < ttl_ms`) so ttl=0 always misses, even when racing + // within the same millisecond. + r.execute_sql("SELECT x FROM t").unwrap(); + r.execute_sql("SELECT x FROM t").unwrap(); + assert_eq!( + calls.load(Ordering::SeqCst), + 2, + "ttl=0 must always miss" + ); + } + + #[test] + fn lru_evicts_oldest_when_over_budget() { + use crate::reader::hybrid_cache::CacheConfig; + let data_inner = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + data_inner + .execute_sql("CREATE TABLE t AS SELECT 1 AS x UNION ALL SELECT 2") + .unwrap(); + let calls = Arc::new(AtomicUsize::new(0)); + let counting = CountingReader::new(data_inner, calls.clone()); + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + // Tiny budget: 1 byte. Every cached entry exceeds it, so eviction + // fires deterministically on every insert. + let cfg = CacheConfig { + enabled: true, + ttl_secs: 300, + max_bytes: 1, + }; + let r = HybridReader::with_cache_config(Box::new(counting), staging, cfg); + + r.execute_sql("SELECT x FROM t WHERE x = 1").unwrap(); // miss → stage A + std::thread::sleep(std::time::Duration::from_millis(5)); + r.execute_sql("SELECT x FROM t WHERE x = 2").unwrap(); // miss → stage B; A evicted + let before = calls.load(Ordering::SeqCst); + r.execute_sql("SELECT x FROM t WHERE x = 1").unwrap(); // re-query A — must MISS + assert_eq!( + calls.load(Ordering::SeqCst), + before + 1, + "A should have been evicted by LRU and re-fetched on re-query" + ); + } + + #[test] + fn clear_cache_drops_everything() { + let data_inner = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + data_inner + .execute_sql("CREATE TABLE t AS SELECT 1 AS x") + .unwrap(); + let calls = Arc::new(AtomicUsize::new(0)); + let counting = CountingReader::new(data_inner, calls.clone()); + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let r = HybridReader::new(Box::new(counting), staging); + + r.execute_sql("SELECT x FROM t").unwrap(); // miss + r.execute_sql("SELECT x FROM t").unwrap(); // hit + assert_eq!(calls.load(Ordering::SeqCst), 1); + + r.clear_cache().unwrap(); + + r.execute_sql("SELECT x FROM t").unwrap(); // miss again — cache wiped + assert_eq!(calls.load(Ordering::SeqCst), 2); + } + + #[test] + fn viz_execute_shares_cache_with_execute_sql() { + // Cache is keyed on (reader_uri, sql). The viz pipeline wraps the + // SQL portion in DDL (`CREATE OR REPLACE TEMP TABLE __ggsql_global__ + // AS `) and issues additional schema/range/data sub-queries + // against the staged temp table. The DDL itself is uncacheable + // (DuckDB returns a 0-column result for DDL, which the cache skips + // to avoid `arrow(...)` rejecting an empty schema), but every + // result-bearing sub-query routes through the cache. + // + // Load-bearing claim verified here: caching is wired into the viz + // path. After running a viz query, manually issuing one of the + // sub-queries the viz pipeline emits (e.g. the data fetch over + // the global temp table) hits the cache and does NOT advance the + // data counter. This guarantees that any pipeline that re-emits + // the same SQL string within the TTL is memoized. + let data_inner = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + data_inner + .execute_sql("CREATE TABLE t AS SELECT 1 AS x, 10 AS y UNION ALL SELECT 2, 20") + .unwrap(); + let calls = Arc::new(AtomicUsize::new(0)); + let counting = CountingReader::new(data_inner, calls.clone()); + let staging = DuckDBReader::from_connection_string("duckdb://memory").unwrap(); + let r = HybridReader::new(Box::new(counting), staging); + + // Run the viz pipeline. This emits the temp-table DDL plus + // schema/range/data sub-queries, all of which (except the DDL) + // populate the cache. + let viz_query = + "SELECT x, y FROM t ORDER BY x\nVISUALISE x AS x, y AS y\nDRAW point"; + let _spec = r.execute(viz_query).unwrap(); + let after_viz = calls.load(Ordering::SeqCst); + assert!( + after_viz >= 1, + "viz call must hit data at least once on cache miss; got {after_viz}" + ); + + // Re-issue the schema-fetch sub-query that the viz pipeline + // emitted internally. The exact SQL depends on the global-table + // name (which embeds a process-stable session UUID), so we + // reconstruct it the same way the pipeline does. + let global = crate::naming::quote_ident(&crate::naming::global_table()); + let schema_sql = format!( + "SELECT * FROM (SELECT * FROM {global}) AS __schema__ LIMIT 1" + ); + let before_replay = calls.load(Ordering::SeqCst); + let _ = r.execute_sql(&schema_sql).unwrap(); + assert_eq!( + calls.load(Ordering::SeqCst), + before_replay, + "replaying a sub-query the viz pipeline already issued must hit \ + the cache — data counter must not advance" + ); + } } From 248b7a44b0925083cfab6b44cb2bb4ca5cef40f1 Mon Sep 17 00:00:00 2001 From: Jim Hester Date: Fri, 1 May 2026 18:03:11 -0400 Subject: [PATCH 06/11] feat(jupyter): add -- @uncache meta-command Lets notebook users invalidate any caches their reader holds without restarting the kernel. Mirrors the existing -- @connect: pattern: prefix constant, parser returning Some(()) on a clean prefix-only line, and a dispatch arm in QueryExecutor::execute that calls Reader::clear_cache(). Returns an empty DataFrame so the cell renders without further machinery. For non-HybridReader readers the trait default makes this a clean no-op; HybridReader overrides to drop its DuckDB cache tables. --- ggsql-jupyter/src/executor.rs | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/ggsql-jupyter/src/executor.rs b/ggsql-jupyter/src/executor.rs index 6f928e7ab..3a7af8de3 100644 --- a/ggsql-jupyter/src/executor.rs +++ b/ggsql-jupyter/src/executor.rs @@ -145,6 +145,22 @@ pub fn parse_meta_command(code: &str) -> Option { .map(|rest| rest.trim().to_string()) } +/// The `-- @uncache` meta-command prefix. +const META_UNCACHE_PREFIX: &str = "-- @uncache"; + +/// Parse a `-- @uncache` meta-command. Returns `Some(())` if `code`, +/// after trimming, is exactly the prefix followed only by whitespace. +pub fn parse_uncache_meta_command(code: &str) -> Option<()> { + let trimmed = code.trim(); + if trimmed.starts_with(META_UNCACHE_PREFIX) + && trimmed[META_UNCACHE_PREFIX.len()..].trim().is_empty() + { + Some(()) + } else { + None + } +} + /// Query executor maintaining persistent database connection pub struct QueryExecutor { reader: Box, @@ -200,6 +216,12 @@ impl QueryExecutor { tracing::debug!("Executing query: {} chars", code.len()); // Check for meta-commands first + if parse_uncache_meta_command(code).is_some() { + tracing::info!("Meta-command: clearing cache"); + self.reader.clear_cache()?; + return Ok(ExecutionResult::DataFrame(ggsql::DataFrame::empty())); + } + if let Some(uri) = parse_meta_command(code) { tracing::info!("Meta-command: switching reader to {}", uri); self.swap_reader(&uri)?; @@ -285,6 +307,22 @@ mod tests { assert_eq!(parse_meta_command("SELECT 1"), None); } + #[test] + fn uncache_meta_command_parses() { + assert_eq!(parse_uncache_meta_command("-- @uncache"), Some(())); + assert_eq!(parse_uncache_meta_command("-- @uncache \n"), Some(())); + assert_eq!(parse_uncache_meta_command("SELECT 1"), None); + } + + #[test] + fn uncache_clears_reader_cache() { + let mut ex = QueryExecutor::new().unwrap(); + // duckdb://memory doesn't have HybridReader; clear_cache is a no-op. + // We just assert the dispatch doesn't error. + let res = ex.execute("-- @uncache").unwrap(); + assert!(matches!(res, ExecutionResult::DataFrame(_))); + } + #[test] fn test_meta_command_switches_reader() { let mut executor = QueryExecutor::new().unwrap(); From c357cff3fad4d503e8a14410428f5e5aeb767a4b Mon Sep 17 00:00:00 2001 From: Jim Hester Date: Fri, 1 May 2026 18:05:43 -0400 Subject: [PATCH 07/11] feat(jupyter): emit both Vega-Lite v5 and v6 mime payloads Frontends vary in which Vega-Lite version they render natively: JupyterLab 4.x's built-in @jupyterlab/vega5-extension only handles v5; nteract and newer Lab extensions render v6. Emitting both lets the client pick whichever it supports without the user installing an extension or trusting a CDN-loaded vega-embed. The v5 payload has its \$schema URL rewritten to the v5 schema since the JupyterLab vega5 extension validates schema-URL-vs-mime-version agreement. The two specs are otherwise identical; ggsql's generated output uses core Vega-Lite features stable across v5 and v6. --- ggsql-jupyter/src/display.rs | 71 ++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/ggsql-jupyter/src/display.rs b/ggsql-jupyter/src/display.rs index ebf03a7ef..6eb63a70b 100644 --- a/ggsql-jupyter/src/display.rs +++ b/ggsql-jupyter/src/display.rs @@ -101,14 +101,55 @@ fn format_connection_changed(display_name: &str) -> Value { /// Format Vega-Lite visualization as display_data fn format_vegalite(spec: String, hints: &RenderHints) -> Value { let html = vegalite_html(&spec, hints); + + let spec_value: Value = serde_json::from_str(&spec).unwrap_or_else(|e| { + tracing::error!("Failed to parse Vega-Lite JSON: {}", e); + json!({"error": "Invalid Vega-Lite JSON"}) + }); + + // Rewrite the spec's $schema to v5 for the v5 mime bundle so clients + // that validate the schema URL against the mime version (notably + // JupyterLab 4.x's built-in @jupyterlab/vega5-extension) accept it. + // The two mime payloads are otherwise identical; ggsql's generated + // specs use core Vega-Lite features that are stable across v5 and v6. + let mut spec_v5 = spec_value.clone(); + if let Some(obj) = spec_v5.as_object_mut() { + obj.insert( + "$schema".to_string(), + json!("https://vega.github.io/schema/vega-lite/v5.json"), + ); + } + json!({ "data": { + // Newer native mime bundle. nteract and newer Lab extensions + // render this directly. JupyterLab 4.x does NOT have a built-in + // v6 renderer and will fall through to the v5 bundle below. + "application/vnd.vegalite.v6+json": spec_value, + + // v5 native mime bundle — JupyterLab 4.x has a built-in + // renderer for this (no extra extensions, no script execution, + // no CDN round-trip). Chosen preferentially over text/html. + "application/vnd.vegalite.v5+json": spec_v5, + + // HTML with embedded vega-embed as a last-resort fallback for + // clients that lack any native Vega-Lite renderer. Requires + // notebook trust because it contains a