You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
feat: Add pluggable StatisticsRegistry for operator-level statistics propagation (apache#21483)
## Which issue does this PR close?
- Part of apache#21443 (Pluggable operator-level statistics propagation)
- Part of apache#8227 (statistics improvements epic)
## Rationale for this change
DataFusion's built-in statistics propagation has no extension point:
downstream projects cannot inject external catalog stats, override
built-in estimation, or plug in custom strategies without forking.
This PR introduces `StatisticsRegistry`, a pluggable
chain-of-responsibility for operator-level statistics following the same
pattern as `RelationPlanner` for SQL parsing and `ExpressionAnalyzer`
(apache#21120) for expression-level stats. See apache#21443 for full motivation and
design context.
## What changes are included in this PR?
1. Framework (`operator_statistics/mod.rs`): `StatisticsProvider` trait,
`StatisticsRegistry` (chain-of-responsibility), `ExtendedStatistics`
(Statistics + type-erased extension map), `DefaultStatisticsProvider`.
`PhysicalOptimizerContext` trait with `optimize_with_context` dispatch.
`SessionState` integration.
2. Built-in providers for Filter, Projection, Passthrough
(sort/repartition/etc), Aggregate, Join
(hash/sort-merge/nested-loop/cross), Limit, and Union. NDV utilities:
`num_distinct_vals`, `ndv_after_selectivity`.
3. `ClosureStatisticsProvider`: closure-based provider for test
injection and cardinality feedback.
4. JoinSelection integration: `use_statistics_registry` config flag
(default false), registry-aware `optimize_with_context`, SLT test
demonstrating plan difference on skewed data.
## Are these changes tested?
- 39 unit tests covering all providers, NDV utilities, chain priority,
and edge cases (Inexact precision, Absent propagation, Partial aggregate
delegation, GROUPING SETS delegation, join-type bounds, multi-key NDV,
exact Cartesian product, CrossJoin, GlobalLimit skip+fetch)
- 1 SLT test (`statistics_registry.slt`): three-table join on skewed
data (8:1:1 customer_id distribution) where the built-in NDV formula
estimates 33 rows (wrong; actual=66) and the registry conservatively
estimates 100, producing the correct build-side swap
## Are there any user-facing changes?
New public API (purely additive, non-breaking):
- `StatisticsProvider` trait and `StatisticsRegistry` in
`datafusion-physical-plan`
- `ExtendedStatistics`, `StatisticsResult` types; built-in provider
structs; `num_distinct_vals`, `ndv_after_selectivity` utilities
- `PhysicalOptimizerContext` trait and `ConfigOnlyContext` in
`datafusion-physical-optimizer`
- `SessionState::statistics_registry()`,
`SessionStateBuilder::with_statistics_registry()`
- Config: `datafusion.optimizer.use_statistics_registry` (default false)
Default behavior is unchanged. The registry is only consulted when the
flag is explicitly enabled.
Known limitations:
- Column-level stats (NDV, min/max) at Join/Aggregate/Union/Limit
boundaries are not improved: these operators call
`partition_statistics(None)` internally, re-fetching raw child stats and
discarding registry enrichment. 4 TODO comments mark the affected call
sites; apache#20184 would close this gap.
- No `ExpressionAnalyzer` integration yet (apache#21122).
---
Disclaimer: I used AI to assist in the code generation, I have manually
reviewed the output and it matches my intention and understanding.
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
40
43
use datafusion_physical_plan::{ExecutionPlan,ExecutionPlanProperties};
41
44
use std::sync::Arc;
42
45
@@ -53,36 +56,49 @@ impl JoinSelection {
53
56
}
54
57
}
55
58
59
+
/// Get statistics for a plan node, using the registry if available.
60
+
fnget_stats(
61
+
plan:&dynExecutionPlan,
62
+
registry:Option<&StatisticsRegistry>,
63
+
) -> Result<Arc<Statistics>>{
64
+
ifletSome(reg) = registry {
65
+
reg.compute(plan)
66
+
.map(|s| Arc::<Statistics>::clone(s.base_arc()))
67
+
}else{
68
+
plan.partition_statistics(None)
69
+
}
70
+
}
71
+
56
72
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
57
73
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
58
74
/// Checks whether join inputs should be swapped using available statistics.
59
75
///
60
76
/// It follows these steps:
61
-
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
77
+
/// 1. If a [`StatisticsRegistry`] is provided, use it for cross-operator estimates
78
+
/// (e.g., intermediate join outputs that would otherwise have `Absent` statistics).
79
+
/// 2. Compare the in-memory sizes of both sides, and place the smaller side on
62
80
/// the left (build) side.
63
-
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
64
-
/// 3. Do not reorder the join if neither statistic is available, or if
81
+
/// 3. If in-memory byte sizes are unavailable, fall back to row counts.
82
+
/// 4. Do not reorder the join if neither statistic is available, or if
65
83
/// `datafusion.optimizer.join_reordering` is disabled.
66
84
///
67
-
///
68
85
/// Used configurations inside arg `config`
69
86
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
70
87
pub(crate)fnshould_swap_join_order(
71
88
left:&dynExecutionPlan,
72
89
right:&dynExecutionPlan,
73
90
config:&ConfigOptions,
91
+
registry:Option<&StatisticsRegistry>,
74
92
) -> Result<bool>{
75
93
if !config.optimizer.join_reordering{
76
94
returnOk(false);
77
95
}
78
96
79
-
// Get the left and right table's total bytes
80
-
// If both the left and right tables contain total_byte_size statistics,
81
-
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
82
-
let left_stats = left.partition_statistics(None)?;
83
-
let right_stats = right.partition_statistics(None)?;
84
-
// First compare `total_byte_size` of left and right side,
85
-
// if information in this field is insufficient fallback to the `num_rows`
97
+
let left_stats = get_stats(left, registry)?;
98
+
let right_stats = get_stats(right, registry)?;
99
+
100
+
// First compare total_byte_size, then fall back to num_rows if byte
0 commit comments