From d1f8107de21eada986c67b95f52f9baa242c9e1c Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 22 May 2026 23:49:09 +0100 Subject: [PATCH 1/9] Use ArcSwap for aggregate fn registry Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 14 +++----- .../src/aggregate_fn/accumulator_grouped.rs | 14 ++++---- vortex-array/src/aggregate_fn/proto.rs | 2 +- vortex-array/src/aggregate_fn/session.rs | 36 +++++++++++-------- 4 files changed, 34 insertions(+), 32 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 762d5e00fb0..669858d9479 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -142,7 +142,7 @@ impl DynAccumulator for Accumulator { } let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().kernels; + let kernels = &session.aggregate_fns().kernels.load(); // 1. Kernel registry first: a registered `(encoding, aggregate_fn)` kernel is strictly // more specific than the vtable's `try_accumulate` short-circuit. Checking the @@ -150,13 +150,11 @@ impl DynAccumulator for Accumulator { // `Combined::try_accumulate` always returns true, so a later kernel check would be // unreachable. { - let kernels_r = kernels.read(); let batch_id = batch.encoding_id(); - let kernel = kernels_r + let kernel = kernels .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(batch_id, None))) + .or_else(|| kernels.get(&(batch_id, None))) .copied(); - drop(kernels_r); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, batch, ctx)? { @@ -187,13 +185,11 @@ impl DynAccumulator for Accumulator { break; } - let kernels_r = kernels.read(); let batch_id = batch.encoding_id(); - let kernel = kernels_r + let kernel = kernels .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(batch_id, None))) + .or_else(|| kernels.get(&(batch_id, None))) .copied(); - drop(kernels_r); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index a751a7c5749..9f28f317fdc 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,17 +163,16 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels; + let kernels = &session.aggregate_fns().grouped_kernels.load(); for _ in 0..max_iterations() { if elements.is::() { break; } - let kernels_r = kernels.read(); - if let Some(result) = kernels_r + if let Some(result) = kernels .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) + .or_else(|| kernels.get(&(elements.encoding_id(), None))) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -255,17 +254,16 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels; + let kernels = &session.aggregate_fns().grouped_kernels.load(); for _ in 0..64 { if elements.is::() { break; } - let kernels_r = kernels.read(); - if let Some(result) = kernels_r + if let Some(result) = kernels .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) + .or_else(|| kernels.get(&(elements.encoding_id(), None))) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { diff --git a/vortex-array/src/aggregate_fn/proto.rs b/vortex-array/src/aggregate_fn/proto.rs index 9bcfab5818c..bff198b3955 100644 --- a/vortex-array/src/aggregate_fn/proto.rs +++ b/vortex-array/src/aggregate_fn/proto.rs @@ -36,7 +36,7 @@ impl AggregateFnRef { /// Note: the serialization format is not stable and may change between versions. pub fn from_proto(proto: &pb::AggregateFn, session: &VortexSession) -> VortexResult { let agg_fn_id: AggregateFnId = AggregateFnId::new(proto.id.as_str()); - let agg_fn = if let Some(plugin) = session.aggregate_fns().registry().find(&agg_fn_id) { + let agg_fn = if let Some(plugin) = session.aggregate_fns().find_plugin(&agg_fn_id) { plugin.deserialize(proto.metadata(), session)? } else if session.allows_unknown() { new_foreign_aggregate_fn(agg_fn_id, proto.metadata().to_vec()) diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index d89f9da069d..1d3400bb68d 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -4,7 +4,7 @@ use std::any::Any; use std::sync::Arc; -use parking_lot::RwLock; +use arc_swap::ArcSwap; use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; @@ -49,10 +49,10 @@ pub type AggregateFnRegistry = Registry; /// Session state for aggregate function vtables. #[derive(Debug)] pub struct AggregateFnSession { - registry: AggregateFnRegistry, + registry: ArcSwap>, - pub(super) kernels: RwLock>, - pub(super) grouped_kernels: RwLock>, + pub(super) kernels: ArcSwap>, + pub(super) grouped_kernels: ArcSwap>, } impl SessionVar for AggregateFnSession { @@ -70,9 +70,9 @@ type KernelKey = (ArrayId, Option); impl Default for AggregateFnSession { fn default() -> Self { let this = Self { - registry: AggregateFnRegistry::default(), - kernels: RwLock::new(HashMap::default()), - grouped_kernels: RwLock::new(HashMap::default()), + registry: ArcSwap::from_pointee(HashMap::default()), + kernels: ArcSwap::from_pointee(HashMap::default()), + grouped_kernels: ArcSwap::from_pointee(HashMap::default()), }; // Register the built-in aggregate functions @@ -107,15 +107,20 @@ impl Default for AggregateFnSession { impl AggregateFnSession { /// Returns the aggregate function registry. - pub fn registry(&self) -> &AggregateFnRegistry { - &self.registry + pub fn find_plugin(&self, id: &AggregateFnId) -> Option { + self.registry.load().get(id).cloned() } /// Register an aggregate function vtable in the session, replacing any existing vtable with /// the same ID. pub fn register(&self, vtable: V) { - self.registry - .register(vtable.id(), Arc::new(vtable) as AggregateFnPluginRef); + let id = vtable.id(); + let pluginref = Arc::new(vtable) as AggregateFnPluginRef; + self.registry.rcu(move |registry| { + let mut existing = registry.as_ref().clone(); + existing.insert(id, Arc::clone(&pluginref)); + existing + }); } /// Register an aggregate function kernel for a specific aggregate function and array type. @@ -125,9 +130,12 @@ impl AggregateFnSession { agg_fn_id: Option>, kernel: &'static dyn DynAggregateKernel, ) { - self.kernels - .write() - .insert((array_id.into(), agg_fn_id.map(|id| id.into())), kernel); + let id = (array_id.into(), agg_fn_id.map(|id| id.into())); + self.kernels.rcu(move |registry| { + let mut existing = registry.as_ref().clone(); + existing.insert(id, kernel); + existing + }); } } From cfed1144f5f1808a50f643c75e52f684acc5abda Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 27 May 2026 11:25:16 +0100 Subject: [PATCH 2/9] more Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 16 +++----- .../src/aggregate_fn/accumulator_grouped.rs | 14 +++---- vortex-array/src/aggregate_fn/session.rs | 38 +++++++++++++++---- 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 669858d9479..cf5e7f6c219 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -142,7 +142,7 @@ impl DynAccumulator for Accumulator { } let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().kernels.load(); + let aggregate_fns = session.aggregate_fns(); // 1. Kernel registry first: a registered `(encoding, aggregate_fn)` kernel is strictly // more specific than the vtable's `try_accumulate` short-circuit. Checking the @@ -150,11 +150,8 @@ impl DynAccumulator for Accumulator { // `Combined::try_accumulate` always returns true, so a later kernel check would be // unreachable. { - let batch_id = batch.encoding_id(); - let kernel = kernels - .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(batch_id, None))) - .copied(); + let kernel = + aggregate_fns.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, batch, ctx)? { @@ -185,11 +182,8 @@ impl DynAccumulator for Accumulator { break; } - let batch_id = batch.encoding_id(); - let kernel = kernels - .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(batch_id, None))) - .copied(); + let kernel = + aggregate_fns.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index 9f28f317fdc..3fb53229b4e 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,16 +163,15 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels.load(); + let aggregate_fns = session.aggregate_fns(); for _ in 0..max_iterations() { if elements.is::() { break; } - if let Some(result) = kernels - .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(elements.encoding_id(), None))) + if let Some(result) = aggregate_fns + .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -254,16 +253,15 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels.load(); + let aggregate_fns = session.aggregate_fns(); for _ in 0..64 { if elements.is::() { break; } - if let Some(result) = kernels - .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(elements.encoding_id(), None))) + if let Some(result) = aggregate_fns + .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 1d3400bb68d..90dbb48729f 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -8,7 +8,6 @@ use arc_swap::ArcSwap; use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; -use vortex_session::registry::Registry; use vortex_utils::aliases::hash_map::HashMap; use crate::aggregate_fn::AggregateFnId; @@ -43,16 +42,13 @@ use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; -/// Registry of aggregate function vtables. -pub type AggregateFnRegistry = Registry; - /// Session state for aggregate function vtables. #[derive(Debug)] pub struct AggregateFnSession { registry: ArcSwap>, - pub(super) kernels: ArcSwap>, - pub(super) grouped_kernels: ArcSwap>, + kernels: ArcSwap>, + grouped_kernels: ArcSwap>, } impl SessionVar for AggregateFnSession { @@ -106,7 +102,7 @@ impl Default for AggregateFnSession { } impl AggregateFnSession { - /// Returns the aggregate function registry. + /// Find plugin in the registry for the given id pub fn find_plugin(&self, id: &AggregateFnId) -> Option { self.registry.load().get(id).cloned() } @@ -123,6 +119,20 @@ impl AggregateFnSession { }); } + pub fn find_aggregate_kernel( + &self, + array_id: impl Into, + agg_fn_id: impl Into, + ) -> Option<&'static dyn DynAggregateKernel> { + let loaded = self.kernels.load(); + let id = array_id.into(); + let fn_id = agg_fn_id.into(); + loaded + .get(&(id, Some(fn_id))) + .or_else(|| loaded.get(&(id, None))) + .copied() + } + /// Register an aggregate function kernel for a specific aggregate function and array type. pub fn register_aggregate_kernel( &self, @@ -137,6 +147,20 @@ impl AggregateFnSession { existing }); } + + pub fn find_groupped_kernel( + &self, + array_id: impl Into, + agg_fn_id: impl Into, + ) -> Option<&'static dyn DynGroupedAggregateKernel> { + let loaded = self.grouped_kernels.load(); + let id = array_id.into(); + let fn_id = agg_fn_id.into(); + loaded + .get(&(id, Some(fn_id))) + .or_else(|| loaded.get(&(id, None))) + .copied() + } } /// Extension trait for accessing aggregate function session data. From e006bfd6b5b6c8c85d41cb91fe57d23c6ae03927 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 27 May 2026 11:26:40 +0100 Subject: [PATCH 3/9] rename Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator_grouped.rs | 4 ++-- vortex-array/src/aggregate_fn/session.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index 3fb53229b4e..be940d8c474 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -171,7 +171,7 @@ impl GroupedAccumulator { } if let Some(result) = aggregate_fns - .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -261,7 +261,7 @@ impl GroupedAccumulator { } if let Some(result) = aggregate_fns - .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 90dbb48729f..c145ad4a2d9 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -148,7 +148,7 @@ impl AggregateFnSession { }); } - pub fn find_groupped_kernel( + pub fn find_grouped_kernel( &self, array_id: impl Into, agg_fn_id: impl Into, From 2a308a26930b91df78d05de7751a08ddc222de51 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 29 May 2026 09:27:51 +0100 Subject: [PATCH 4/9] logs Signed-off-by: Robert Kruszewski --- vortex-test/compat-gen/src/check.rs | 87 ++++++++++++++++++++++++-- vortex-test/compat-gen/src/generate.rs | 22 ++++++- 2 files changed, 101 insertions(+), 8 deletions(-) diff --git a/vortex-test/compat-gen/src/check.rs b/vortex-test/compat-gen/src/check.rs index 7533ee9dab7..ad10d8201d3 100644 --- a/vortex-test/compat-gen/src/check.rs +++ b/vortex-test/compat-gen/src/check.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::path::Path; +use std::time::Instant; use clap::ValueEnum; use serde::Serialize; @@ -63,13 +64,42 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { // Generate fresh fixtures into a temp directory. let tmp_dir = tempfile::tempdir().map_err(|e| vortex_err!("failed to create temp dir: {e}"))?; - eprintln!("generating fresh fixtures for comparison..."); - for fixture in &fixtures { - fixture.write(tmp_dir.path())?; + let generation_start = Instant::now(); + eprintln!( + "generating {} fresh fixtures for comparison in {}...", + fixtures.len(), + tmp_dir.path().display() + ); + for (idx, fixture) in fixtures.iter().enumerate() { + let fixture_start = Instant::now(); + eprintln!( + " generating {}/{} {}...", + idx + 1, + fixtures.len(), + fixture.name() + ); + let entries = fixture.write(tmp_dir.path())?; + let written = entries + .iter() + .map(|entry| entry.name.as_str()) + .collect::>() + .join(", "); + eprintln!( + " generated {}/{} {} in {:.3}s: {}", + idx + 1, + fixtures.len(), + fixture.name(), + fixture_start.elapsed().as_secs_f64(), + written + ); } + eprintln!( + "generated fresh fixtures in {:.3}s", + generation_start.elapsed().as_secs_f64() + ); // Collect .vortex files in the check directory. - let dir_files: Vec = std::fs::read_dir(dir) + let mut dir_files: Vec = std::fs::read_dir(dir) .map_err(|e| vortex_err!("failed to read dir {}: {e}", dir.display()))? .filter_map(|entry| { let entry = entry.ok()?; @@ -77,9 +107,10 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { name.ends_with(".vortex").then_some(name) }) .collect(); + dir_files.sort(); // Collect all fixture names (each fixture may produce multiple files). - let fresh_files: Vec = std::fs::read_dir(tmp_dir.path()) + let mut fresh_files: Vec = std::fs::read_dir(tmp_dir.path()) .map_err(|e| vortex_err!("failed to read tmp dir: {e}"))? .filter_map(|entry| { let entry = entry.ok()?; @@ -87,6 +118,7 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { name.ends_with(".vortex").then_some(name) }) .collect(); + fresh_files.sort(); let mut result = CheckResult { passed: Vec::new(), @@ -131,9 +163,12 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } + let check_start = Instant::now(); eprintln!(" checking {fresh_name}..."); // Read the stored file. + let read_stored_start = Instant::now(); + eprintln!(" reading stored file..."); let stored_bytes = match std::fs::read(&stored_path) { Ok(b) => ByteBuffer::from(b), Err(e) => { @@ -144,9 +179,16 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; + eprintln!( + " read stored file in {:.3}s ({} bytes)", + read_stored_start.elapsed().as_secs_f64(), + stored_bytes.len() + ); // Read the fresh file. let fresh_path = tmp_dir.path().join(fresh_name); + let read_fresh_start = Instant::now(); + eprintln!(" reading fresh file..."); let fresh_bytes = match std::fs::read(&fresh_path) { Ok(b) => ByteBuffer::from(b), Err(e) => { @@ -157,9 +199,16 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; + eprintln!( + " read fresh file in {:.3}s ({} bytes)", + read_fresh_start.elapsed().as_secs_f64(), + fresh_bytes.len() + ); // Validate the full layout tree of the stored file (reads every segment // including zone maps, dictionaries, etc.). + let layout_start = Instant::now(); + eprintln!(" validating stored layout tree..."); if let Err(e) = adapter::read_layout_tree(stored_bytes.clone()) { result.failed.push(FailedFixture { name: fresh_name.clone(), @@ -167,8 +216,14 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { }); continue; } + eprintln!( + " validated stored layout tree in {:.3}s", + layout_start.elapsed().as_secs_f64() + ); // Scan data arrays from both files and compare. + let decode_stored_start = Instant::now(); + eprintln!(" decoding stored file..."); let stored_array = match adapter::read_file(stored_bytes) { Ok(a) => a, Err(e) => { @@ -179,6 +234,13 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; + eprintln!( + " decoded stored file in {:.3}s", + decode_stored_start.elapsed().as_secs_f64() + ); + + let decode_fresh_start = Instant::now(); + eprintln!(" decoding fresh file..."); let fresh_array = match adapter::read_file(fresh_bytes) { Ok(a) => a, Err(e) => { @@ -189,9 +251,22 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; + eprintln!( + " decoded fresh file in {:.3}s", + decode_fresh_start.elapsed().as_secs_f64() + ); + let compare_start = Instant::now(); + eprintln!(" comparing arrays..."); assert_arrays_eq!(stored_array, fresh_array); - eprintln!(" pass {fresh_name}"); + eprintln!( + " compared arrays in {:.3}s", + compare_start.elapsed().as_secs_f64() + ); + eprintln!( + " pass {fresh_name} in {:.3}s", + check_start.elapsed().as_secs_f64() + ); result.passed.push(fresh_name.clone()); } diff --git a/vortex-test/compat-gen/src/generate.rs b/vortex-test/compat-gen/src/generate.rs index 7490540f75e..eca430aaf93 100644 --- a/vortex-test/compat-gen/src/generate.rs +++ b/vortex-test/compat-gen/src/generate.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::path::Path; +use std::time::Instant; use base16ct::HexDisplay; use serde::Serialize; @@ -42,17 +43,30 @@ pub fn write_fixtures(output_dir: &Path, exclude: &[String]) -> VortexResult VortexResult Date: Fri, 29 May 2026 13:42:38 +0100 Subject: [PATCH 5/9] more logs Signed-off-by: Robert Kruszewski --- vortex-test/compat-gen/src/adapter.rs | 39 ++++++++++++-- vortex-test/compat-gen/src/fixtures/mod.rs | 62 ++++++++++++++++++++++ 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/vortex-test/compat-gen/src/adapter.rs b/vortex-test/compat-gen/src/adapter.rs index a97399dac79..bf8442b8152 100644 --- a/vortex-test/compat-gen/src/adapter.rs +++ b/vortex-test/compat-gen/src/adapter.rs @@ -8,6 +8,7 @@ use std::path::Path; use std::sync::Arc; +use std::time::Instant; use futures::stream; use tokio::runtime::Runtime; @@ -23,6 +24,7 @@ use vortex::io::session::RuntimeSessionExt; use vortex::layout::LayoutStrategy; use vortex::layout::layouts::flat::Flat; use vortex::layout::layouts::flat::writer::FlatLayoutStrategy; +use vortex_array::expr::stats::Precision; use vortex_array::expr::stats::Stat; use vortex_array::stream::ArrayStreamAdapter; use vortex_array::stream::ArrayStreamExt; @@ -43,9 +45,40 @@ fn runtime() -> VortexResult { pub fn compute_all_stats(array: &ArrayRef) -> VortexResult<()> { let all_stats: Vec = Stat::all().collect(); let mut ctx = LEGACY_SESSION.create_execution_ctx(); - for node in array.depth_first_traversal() { - let computed = node.statistics().compute_all(&all_stats, &mut ctx)?; - node.statistics().set_iter(computed.into_iter()); + for (idx, node) in array.depth_first_traversal().enumerate() { + eprintln!( + " stats node {}: encoding {}, len {}, dtype {}", + idx + 1, + node.encoding_id(), + node.len(), + node.dtype() + ); + for &stat in &all_stats { + let stat_start = Instant::now(); + eprintln!(" computing {stat}..."); + match node.statistics().compute_stat(stat, &mut ctx)? { + Some(scalar) => { + if let Some(value) = scalar.into_value() { + node.statistics().set(stat, Precision::exact(value)); + eprintln!( + " computed {stat} in {:.3}s", + stat_start.elapsed().as_secs_f64() + ); + } else { + eprintln!( + " skipped {stat} in {:.3}s (no scalar value)", + stat_start.elapsed().as_secs_f64() + ); + } + } + None => { + eprintln!( + " skipped {stat} in {:.3}s (unsupported)", + stat_start.elapsed().as_secs_f64() + ); + } + } + } } Ok(()) } diff --git a/vortex-test/compat-gen/src/fixtures/mod.rs b/vortex-test/compat-gen/src/fixtures/mod.rs index edf0dd47e37..640cc705b0f 100644 --- a/vortex-test/compat-gen/src/fixtures/mod.rs +++ b/vortex-test/compat-gen/src/fixtures/mod.rs @@ -5,6 +5,7 @@ mod arrays; use std::path::Path; use std::sync::Arc; +use std::time::Instant; use vortex::array::ArrayId; use vortex::array::ArrayRef; @@ -83,11 +84,48 @@ impl Fixture for FlatLayoutAdapter { } fn write(&self, dir: &Path) -> VortexResult> { + let fixture_start = Instant::now(); + eprintln!(" building array..."); + let build_start = Instant::now(); let array = self.0.build()?; + eprintln!( + " built array in {:.3}s (len {}, dtype {})", + build_start.elapsed().as_secs_f64(), + array.len(), + array.dtype() + ); + + let encodings_start = Instant::now(); + eprintln!(" checking expected encodings..."); check_expected_encodings(&array, self.0.as_ref())?; + eprintln!( + " checked expected encodings in {:.3}s", + encodings_start.elapsed().as_secs_f64() + ); + + let stats_start = Instant::now(); + eprintln!(" computing all stats..."); compute_all_stats(&array)?; + eprintln!( + " computed all stats in {:.3}s", + stats_start.elapsed().as_secs_f64() + ); + let path = dir.join(self.name()); + let path_display = path.display(); + let write_start = Instant::now(); + eprintln!(" writing {path_display}..."); adapter::write_file(&path, array)?; + let bytes = path.metadata().map(|m| m.len()).unwrap_or(0); + eprintln!( + " wrote {path_display} in {:.3}s ({} bytes)", + write_start.elapsed().as_secs_f64(), + bytes + ); + eprintln!( + " fixture write completed in {:.3}s", + fixture_start.elapsed().as_secs_f64() + ); Ok(vec![FixtureEntry { name: self.name().to_string(), description: self.description().to_string(), @@ -132,17 +170,41 @@ impl Fixture for DatasetFixtureAdapter { } fn write(&self, dir: &Path) -> VortexResult> { + let fixture_start = Instant::now(); + eprintln!(" building dataset..."); + let build_start = Instant::now(); let array = self.inner.build()?; + eprintln!( + " built dataset in {:.3}s (len {}, dtype {})", + build_start.elapsed().as_secs_f64(), + array.len(), + array.dtype() + ); + let path = dir.join(self.name()); + let path_display = path.display(); + let write_start = Instant::now(); if self.compact { + eprintln!(" writing compact compressed fixture to {path_display}..."); let strategy = WriteStrategyBuilder::default() .with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact()) .build(); adapter::write_compressed(&path, array, strategy)?; } else { + eprintln!(" writing regular compressed fixture to {path_display}..."); let strategy = WriteStrategyBuilder::default().build(); adapter::write_compressed(&path, array, strategy)?; } + let bytes = path.metadata().map(|m| m.len()).unwrap_or(0); + eprintln!( + " wrote {path_display} in {:.3}s ({} bytes)", + write_start.elapsed().as_secs_f64(), + bytes + ); + eprintln!( + " fixture write completed in {:.3}s", + fixture_start.elapsed().as_secs_f64() + ); Ok(vec![FixtureEntry { name: self.name().to_string(), description: self.description().to_string(), From c2d7ee07f81d2d759c57d29c55ef4397e0af04f1 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 29 May 2026 15:42:53 +0100 Subject: [PATCH 6/9] more Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 13 ++++++++----- .../src/aggregate_fn/accumulator_grouped.rs | 12 +++++++----- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index cf5e7f6c219..d87f46517b4 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -4,6 +4,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_ensure; use vortex_error::vortex_err; +use vortex_session::SessionExt; use crate::ArrayRef; use crate::Columnar; @@ -11,6 +12,7 @@ use crate::ExecutionCtx; use crate::aggregate_fn::AggregateFn; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::AggregateFnVTable; +use crate::aggregate_fn::session::AggregateFnSession; use crate::aggregate_fn::session::AggregateFnSessionExt; use crate::columnar::AnyColumnar; use crate::dtype::DType; @@ -142,7 +144,6 @@ impl DynAccumulator for Accumulator { } let session = ctx.session().clone(); - let aggregate_fns = session.aggregate_fns(); // 1. Kernel registry first: a registered `(encoding, aggregate_fn)` kernel is strictly // more specific than the vtable's `try_accumulate` short-circuit. Checking the @@ -150,8 +151,9 @@ impl DynAccumulator for Accumulator { // `Combined::try_accumulate` always returns true, so a later kernel check would be // unreachable. { - let kernel = - aggregate_fns.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); + let kernel = session + .aggregate_fns() + .find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, batch, ctx)? { @@ -182,8 +184,9 @@ impl DynAccumulator for Accumulator { break; } - let kernel = - aggregate_fns.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); + let kernel = session + .aggregate_fns() + .find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index be940d8c474..9304fc181d3 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,15 +163,17 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let aggregate_fns = session.aggregate_fns(); for _ in 0..max_iterations() { if elements.is::() { break; } - if let Some(result) = aggregate_fns - .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) + let kernel = session + .aggregate_fns() + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()); + + if let Some(result) = kernel .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -253,14 +255,14 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let aggregate_fns = session.aggregate_fns(); for _ in 0..64 { if elements.is::() { break; } - if let Some(result) = aggregate_fns + if let Some(result) = session + .aggregate_fns() .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe From 769cce0ac946782bcb5638c232730e290a23ca8a Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 29 May 2026 16:07:52 +0100 Subject: [PATCH 7/9] less Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 9 +- .../src/aggregate_fn/accumulator_grouped.rs | 16 ++-- vortex-test/compat-gen/src/adapter.rs | 39 +-------- vortex-test/compat-gen/src/check.rs | 87 ++----------------- vortex-test/compat-gen/src/fixtures/mod.rs | 62 ------------- vortex-test/compat-gen/src/generate.rs | 22 +---- 6 files changed, 25 insertions(+), 210 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index d87f46517b4..5851a775cca 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -178,16 +178,15 @@ impl DynAccumulator for Accumulator { // Iteration 0 re-checks the initial encoding — a redundant HashMap miss, the price of // keeping the loop body uniform. Terminates on `AnyColumnar` (Canonical or Constant) // since the vtable's `accumulate(&Columnar)` handles both cases directly. + let aggs = session.aggregate_fns(); let mut batch = batch.clone(); for _ in 0..max_iterations() { if batch.is::() { break; } - let kernel = session - .aggregate_fns() - .find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); - if let Some(kernel) = kernel + if let Some(kernel) = + aggs.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()) && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { vortex_ensure!( @@ -203,6 +202,8 @@ impl DynAccumulator for Accumulator { batch = batch.execute(ctx)?; } + drop(aggs); + // 4. Otherwise, execute the batch until it is columnar and accumulate it into the state. let columnar = batch.execute::(ctx)?; diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index 9304fc181d3..142690a898d 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,17 +163,15 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); + let aggs = session.aggregate_fns(); for _ in 0..max_iterations() { if elements.is::() { break; } - let kernel = session - .aggregate_fns() - .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()); - - if let Some(result) = kernel + if let Some(result) = aggs + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -197,6 +195,8 @@ impl GroupedAccumulator { elements = elements.execute(ctx)?; } + drop(aggs); + // Otherwise, we iterate the offsets and sizes and accumulate each group one by one. let elements = elements.execute::(ctx)?.into_array(); let offsets = groups.offsets(); @@ -255,14 +255,14 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); + let aggs = session.aggregate_fns(); for _ in 0..64 { if elements.is::() { break; } - if let Some(result) = session - .aggregate_fns() + if let Some(result) = aggs .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe @@ -288,6 +288,8 @@ impl GroupedAccumulator { elements = elements.execute(ctx)?; } + drop(aggs); + // Otherwise, we iterate the offsets and sizes and accumulate each group one by one. let elements = elements.execute::(ctx)?.into_array(); let validity = groups_validity.execute_mask(groups.len(), ctx)?; diff --git a/vortex-test/compat-gen/src/adapter.rs b/vortex-test/compat-gen/src/adapter.rs index bf8442b8152..a97399dac79 100644 --- a/vortex-test/compat-gen/src/adapter.rs +++ b/vortex-test/compat-gen/src/adapter.rs @@ -8,7 +8,6 @@ use std::path::Path; use std::sync::Arc; -use std::time::Instant; use futures::stream; use tokio::runtime::Runtime; @@ -24,7 +23,6 @@ use vortex::io::session::RuntimeSessionExt; use vortex::layout::LayoutStrategy; use vortex::layout::layouts::flat::Flat; use vortex::layout::layouts::flat::writer::FlatLayoutStrategy; -use vortex_array::expr::stats::Precision; use vortex_array::expr::stats::Stat; use vortex_array::stream::ArrayStreamAdapter; use vortex_array::stream::ArrayStreamExt; @@ -45,40 +43,9 @@ fn runtime() -> VortexResult { pub fn compute_all_stats(array: &ArrayRef) -> VortexResult<()> { let all_stats: Vec = Stat::all().collect(); let mut ctx = LEGACY_SESSION.create_execution_ctx(); - for (idx, node) in array.depth_first_traversal().enumerate() { - eprintln!( - " stats node {}: encoding {}, len {}, dtype {}", - idx + 1, - node.encoding_id(), - node.len(), - node.dtype() - ); - for &stat in &all_stats { - let stat_start = Instant::now(); - eprintln!(" computing {stat}..."); - match node.statistics().compute_stat(stat, &mut ctx)? { - Some(scalar) => { - if let Some(value) = scalar.into_value() { - node.statistics().set(stat, Precision::exact(value)); - eprintln!( - " computed {stat} in {:.3}s", - stat_start.elapsed().as_secs_f64() - ); - } else { - eprintln!( - " skipped {stat} in {:.3}s (no scalar value)", - stat_start.elapsed().as_secs_f64() - ); - } - } - None => { - eprintln!( - " skipped {stat} in {:.3}s (unsupported)", - stat_start.elapsed().as_secs_f64() - ); - } - } - } + for node in array.depth_first_traversal() { + let computed = node.statistics().compute_all(&all_stats, &mut ctx)?; + node.statistics().set_iter(computed.into_iter()); } Ok(()) } diff --git a/vortex-test/compat-gen/src/check.rs b/vortex-test/compat-gen/src/check.rs index ad10d8201d3..7533ee9dab7 100644 --- a/vortex-test/compat-gen/src/check.rs +++ b/vortex-test/compat-gen/src/check.rs @@ -2,7 +2,6 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::path::Path; -use std::time::Instant; use clap::ValueEnum; use serde::Serialize; @@ -64,42 +63,13 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { // Generate fresh fixtures into a temp directory. let tmp_dir = tempfile::tempdir().map_err(|e| vortex_err!("failed to create temp dir: {e}"))?; - let generation_start = Instant::now(); - eprintln!( - "generating {} fresh fixtures for comparison in {}...", - fixtures.len(), - tmp_dir.path().display() - ); - for (idx, fixture) in fixtures.iter().enumerate() { - let fixture_start = Instant::now(); - eprintln!( - " generating {}/{} {}...", - idx + 1, - fixtures.len(), - fixture.name() - ); - let entries = fixture.write(tmp_dir.path())?; - let written = entries - .iter() - .map(|entry| entry.name.as_str()) - .collect::>() - .join(", "); - eprintln!( - " generated {}/{} {} in {:.3}s: {}", - idx + 1, - fixtures.len(), - fixture.name(), - fixture_start.elapsed().as_secs_f64(), - written - ); + eprintln!("generating fresh fixtures for comparison..."); + for fixture in &fixtures { + fixture.write(tmp_dir.path())?; } - eprintln!( - "generated fresh fixtures in {:.3}s", - generation_start.elapsed().as_secs_f64() - ); // Collect .vortex files in the check directory. - let mut dir_files: Vec = std::fs::read_dir(dir) + let dir_files: Vec = std::fs::read_dir(dir) .map_err(|e| vortex_err!("failed to read dir {}: {e}", dir.display()))? .filter_map(|entry| { let entry = entry.ok()?; @@ -107,10 +77,9 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { name.ends_with(".vortex").then_some(name) }) .collect(); - dir_files.sort(); // Collect all fixture names (each fixture may produce multiple files). - let mut fresh_files: Vec = std::fs::read_dir(tmp_dir.path()) + let fresh_files: Vec = std::fs::read_dir(tmp_dir.path()) .map_err(|e| vortex_err!("failed to read tmp dir: {e}"))? .filter_map(|entry| { let entry = entry.ok()?; @@ -118,7 +87,6 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { name.ends_with(".vortex").then_some(name) }) .collect(); - fresh_files.sort(); let mut result = CheckResult { passed: Vec::new(), @@ -163,12 +131,9 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } - let check_start = Instant::now(); eprintln!(" checking {fresh_name}..."); // Read the stored file. - let read_stored_start = Instant::now(); - eprintln!(" reading stored file..."); let stored_bytes = match std::fs::read(&stored_path) { Ok(b) => ByteBuffer::from(b), Err(e) => { @@ -179,16 +144,9 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; - eprintln!( - " read stored file in {:.3}s ({} bytes)", - read_stored_start.elapsed().as_secs_f64(), - stored_bytes.len() - ); // Read the fresh file. let fresh_path = tmp_dir.path().join(fresh_name); - let read_fresh_start = Instant::now(); - eprintln!(" reading fresh file..."); let fresh_bytes = match std::fs::read(&fresh_path) { Ok(b) => ByteBuffer::from(b), Err(e) => { @@ -199,16 +157,9 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; - eprintln!( - " read fresh file in {:.3}s ({} bytes)", - read_fresh_start.elapsed().as_secs_f64(), - fresh_bytes.len() - ); // Validate the full layout tree of the stored file (reads every segment // including zone maps, dictionaries, etc.). - let layout_start = Instant::now(); - eprintln!(" validating stored layout tree..."); if let Err(e) = adapter::read_layout_tree(stored_bytes.clone()) { result.failed.push(FailedFixture { name: fresh_name.clone(), @@ -216,14 +167,8 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { }); continue; } - eprintln!( - " validated stored layout tree in {:.3}s", - layout_start.elapsed().as_secs_f64() - ); // Scan data arrays from both files and compare. - let decode_stored_start = Instant::now(); - eprintln!(" decoding stored file..."); let stored_array = match adapter::read_file(stored_bytes) { Ok(a) => a, Err(e) => { @@ -234,13 +179,6 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; - eprintln!( - " decoded stored file in {:.3}s", - decode_stored_start.elapsed().as_secs_f64() - ); - - let decode_fresh_start = Instant::now(); - eprintln!(" decoding fresh file..."); let fresh_array = match adapter::read_file(fresh_bytes) { Ok(a) => a, Err(e) => { @@ -251,22 +189,9 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; - eprintln!( - " decoded fresh file in {:.3}s", - decode_fresh_start.elapsed().as_secs_f64() - ); - let compare_start = Instant::now(); - eprintln!(" comparing arrays..."); assert_arrays_eq!(stored_array, fresh_array); - eprintln!( - " compared arrays in {:.3}s", - compare_start.elapsed().as_secs_f64() - ); - eprintln!( - " pass {fresh_name} in {:.3}s", - check_start.elapsed().as_secs_f64() - ); + eprintln!(" pass {fresh_name}"); result.passed.push(fresh_name.clone()); } diff --git a/vortex-test/compat-gen/src/fixtures/mod.rs b/vortex-test/compat-gen/src/fixtures/mod.rs index 640cc705b0f..edf0dd47e37 100644 --- a/vortex-test/compat-gen/src/fixtures/mod.rs +++ b/vortex-test/compat-gen/src/fixtures/mod.rs @@ -5,7 +5,6 @@ mod arrays; use std::path::Path; use std::sync::Arc; -use std::time::Instant; use vortex::array::ArrayId; use vortex::array::ArrayRef; @@ -84,48 +83,11 @@ impl Fixture for FlatLayoutAdapter { } fn write(&self, dir: &Path) -> VortexResult> { - let fixture_start = Instant::now(); - eprintln!(" building array..."); - let build_start = Instant::now(); let array = self.0.build()?; - eprintln!( - " built array in {:.3}s (len {}, dtype {})", - build_start.elapsed().as_secs_f64(), - array.len(), - array.dtype() - ); - - let encodings_start = Instant::now(); - eprintln!(" checking expected encodings..."); check_expected_encodings(&array, self.0.as_ref())?; - eprintln!( - " checked expected encodings in {:.3}s", - encodings_start.elapsed().as_secs_f64() - ); - - let stats_start = Instant::now(); - eprintln!(" computing all stats..."); compute_all_stats(&array)?; - eprintln!( - " computed all stats in {:.3}s", - stats_start.elapsed().as_secs_f64() - ); - let path = dir.join(self.name()); - let path_display = path.display(); - let write_start = Instant::now(); - eprintln!(" writing {path_display}..."); adapter::write_file(&path, array)?; - let bytes = path.metadata().map(|m| m.len()).unwrap_or(0); - eprintln!( - " wrote {path_display} in {:.3}s ({} bytes)", - write_start.elapsed().as_secs_f64(), - bytes - ); - eprintln!( - " fixture write completed in {:.3}s", - fixture_start.elapsed().as_secs_f64() - ); Ok(vec![FixtureEntry { name: self.name().to_string(), description: self.description().to_string(), @@ -170,41 +132,17 @@ impl Fixture for DatasetFixtureAdapter { } fn write(&self, dir: &Path) -> VortexResult> { - let fixture_start = Instant::now(); - eprintln!(" building dataset..."); - let build_start = Instant::now(); let array = self.inner.build()?; - eprintln!( - " built dataset in {:.3}s (len {}, dtype {})", - build_start.elapsed().as_secs_f64(), - array.len(), - array.dtype() - ); - let path = dir.join(self.name()); - let path_display = path.display(); - let write_start = Instant::now(); if self.compact { - eprintln!(" writing compact compressed fixture to {path_display}..."); let strategy = WriteStrategyBuilder::default() .with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact()) .build(); adapter::write_compressed(&path, array, strategy)?; } else { - eprintln!(" writing regular compressed fixture to {path_display}..."); let strategy = WriteStrategyBuilder::default().build(); adapter::write_compressed(&path, array, strategy)?; } - let bytes = path.metadata().map(|m| m.len()).unwrap_or(0); - eprintln!( - " wrote {path_display} in {:.3}s ({} bytes)", - write_start.elapsed().as_secs_f64(), - bytes - ); - eprintln!( - " fixture write completed in {:.3}s", - fixture_start.elapsed().as_secs_f64() - ); Ok(vec![FixtureEntry { name: self.name().to_string(), description: self.description().to_string(), diff --git a/vortex-test/compat-gen/src/generate.rs b/vortex-test/compat-gen/src/generate.rs index eca430aaf93..7490540f75e 100644 --- a/vortex-test/compat-gen/src/generate.rs +++ b/vortex-test/compat-gen/src/generate.rs @@ -2,7 +2,6 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::path::Path; -use std::time::Instant; use base16ct::HexDisplay; use serde::Serialize; @@ -43,30 +42,17 @@ pub fn write_fixtures(output_dir: &Path, exclude: &[String]) -> VortexResult VortexResult Date: Fri, 29 May 2026 21:54:15 +0100 Subject: [PATCH 8/9] lint Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 5851a775cca..64ff49481c0 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -4,7 +4,6 @@ use vortex_error::VortexResult; use vortex_error::vortex_ensure; use vortex_error::vortex_err; -use vortex_session::SessionExt; use crate::ArrayRef; use crate::Columnar; @@ -12,7 +11,6 @@ use crate::ExecutionCtx; use crate::aggregate_fn::AggregateFn; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::AggregateFnVTable; -use crate::aggregate_fn::session::AggregateFnSession; use crate::aggregate_fn::session::AggregateFnSessionExt; use crate::columnar::AnyColumnar; use crate::dtype::DType; From 10f8dfe33eadb5ac559714a3796f237fe8f61a9a Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 29 May 2026 23:24:02 +0100 Subject: [PATCH 9/9] more Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 8 +++----- vortex-array/src/aggregate_fn/accumulator_grouped.rs | 12 ++++-------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 64ff49481c0..c89418e67a6 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -176,15 +176,15 @@ impl DynAccumulator for Accumulator { // Iteration 0 re-checks the initial encoding — a redundant HashMap miss, the price of // keeping the loop body uniform. Terminates on `AnyColumnar` (Canonical or Constant) // since the vtable's `accumulate(&Columnar)` handles both cases directly. - let aggs = session.aggregate_fns(); let mut batch = batch.clone(); for _ in 0..max_iterations() { if batch.is::() { break; } - if let Some(kernel) = - aggs.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()) + if let Some(kernel) = session + .aggregate_fns() + .find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()) && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { vortex_ensure!( @@ -200,8 +200,6 @@ impl DynAccumulator for Accumulator { batch = batch.execute(ctx)?; } - drop(aggs); - // 4. Otherwise, execute the batch until it is columnar and accumulate it into the state. let columnar = batch.execute::(ctx)?; diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index 142690a898d..3fae7a85bf3 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,14 +163,14 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let aggs = session.aggregate_fns(); for _ in 0..max_iterations() { if elements.is::() { break; } - if let Some(result) = aggs + if let Some(result) = session + .aggregate_fns() .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe @@ -195,8 +195,6 @@ impl GroupedAccumulator { elements = elements.execute(ctx)?; } - drop(aggs); - // Otherwise, we iterate the offsets and sizes and accumulate each group one by one. let elements = elements.execute::(ctx)?.into_array(); let offsets = groups.offsets(); @@ -255,14 +253,14 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let aggs = session.aggregate_fns(); for _ in 0..64 { if elements.is::() { break; } - if let Some(result) = aggs + if let Some(result) = session + .aggregate_fns() .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe @@ -288,8 +286,6 @@ impl GroupedAccumulator { elements = elements.execute(ctx)?; } - drop(aggs); - // Otherwise, we iterate the offsets and sizes and accumulate each group one by one. let elements = elements.execute::(ctx)?.into_array(); let validity = groups_validity.execute_mask(groups.len(), ctx)?;