[CALCITE-7618] Add filter pushdown support to the file adapter's CSV table implementation#5048
[CALCITE-7618] Add filter pushdown support to the file adapter's CSV table implementation#5048Diveyam-Mishra wants to merge 2 commits into
Conversation
66fb2ac to
e0535c1
Compare
|
|
||
| protected CsvTableScan(RelOptCluster cluster, RelOptTable table, | ||
| CsvTranslatableTable csvTable, int[] fields, | ||
| @Nullable String @Nullable [] filterValues) { |
There was a problem hiding this comment.
I think CsvEnumerator is actually broken, since it does string comparisons.
This means for example that 0.0 != 0 in a filter.
f60ce88 to
d5d601a
Compare
…table implementation
d5d601a to
3fd66a7
Compare
|
|
I might have complicated a few things because I was getting some Style errors constantly on local which i tried to fix but idk maybe was doing something wrong i tried stopping daemon thread and rebuild yet something went haywire So If its needed i can open a new PR with single proper commit |
|
Please use fresh commits until we finish the review, to make it easier to see what changed in response to reviewers. |
| if (o1 == null || o2 == null) { | ||
| return false; | ||
| } | ||
| if (o1 instanceof BigDecimal && o2 instanceof BigDecimal) { |
There was a problem hiding this comment.
Why is this case needed? Doesn't BigDecimal have equals?
If it does, can this become Objects.equals()?
There was a problem hiding this comment.
The core problem is that BigDecimal violates the intuitive expectation that "same number = equal object":
new BigDecimal("2.0").equals(new BigDecimal("2.00")) // false
new BigDecimal("2.0").compareTo(new BigDecimal("2.00")) == 0 // true
There was a problem hiding this comment.
How about using compareTo for everything and using Comaprable for o1 and o2?
| * {@link CsvTableScan}. | ||
| * | ||
| * <p>Only equality conditions of the form {@code column = literal} can be | ||
| * pushed down, because {@link CsvEnumerator} only supports per-column |
There was a problem hiding this comment.
Could this situation be improved? Is this a fundamental limitation of CsvEnumerator?
Maybe we need a more powerful enumerator.
In principle I think any predicate of the current row value should work.
There was a problem hiding this comment.
My current plan is to introduce a CsvFilter abstraction to represent the subset of filters that can be pushed down (initially AND, OR, = and <>, including null comparisons). Rather than encoding pushdown state as column-value arrays, the planner will build a CsvFilter tree, serialize it, and pass the serialized representation through CsvTableScan/CsvTranslatableTable to CsvEnumerator, where it will be deserialized and evaluated against each row.
The CsvFilter classes are intended to be a lightweight data model representing pushdownable predicates, while evaluation, serialization/deserialization, and pretty-printing remain separate concerns. This keeps the representation extensible for additional pushdown operators in the future without requiring further changes to the transport mechanism between planning and execution.
There is one more option which is to do exactly what spark does compile the filter all the way down to actual bytecode but in my opinion thats a bit overkill
There was a problem hiding this comment.
Calcite already includes a compiler which generates the enumerable code, why can't the same compiler generate the filter implementation as a compiled Java function? Then you can support arbitrary functions.
| sql("model-with-custom-table", sql).ok(); | ||
| } | ||
|
|
||
| /** Test case for |
There was a problem hiding this comment.
It would be nice to higher a higher coverage in terms of SQL types for columns.
There was a problem hiding this comment.
Pull request overview
This PR adds planner-rule-based filter pushdown for the file adapter’s CSV tables by carrying a pushed-down filter condition inside CsvTableScan and compiling it into a runtime predicate during enumerable implementation. It also strengthens CSV row conversion and expands tests to validate predicate behavior (including null-handling and short rows).
Changes:
- Added
CsvFilterTableScanRuleandCsvProjectFilterTableScanRule, and registered them viaFileRules/CsvTableScan#register. - Extended
CsvTableScanto carry a@Nullable RexNode condition, emit it inEXPLAIN, adjust costing, and apply it via a compiledPredicate1inimplement. - Updated
CsvEnumeratorconversion and added tests around missing fields and equality/null semantics, plus additional plan/result coverage in adapter/example tests.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| file/src/test/java/org/apache/calcite/adapter/file/FileAdapterTest.java | Adds/updates tests for pushdown behavior, plans, and null/equality semantics. |
| file/src/test/java/org/apache/calcite/adapter/file/CsvEnumeratorTest.java | Adds a test covering conversion when CSV rows are shorter than the projected schema. |
| file/src/main/java/org/apache/calcite/adapter/file/FileRules.java | Registers new planner rules and documents their intent. |
| file/src/main/java/org/apache/calcite/adapter/file/CsvTableScan.java | Stores pushed-down filter condition and applies it during enumerable implementation. |
| file/src/main/java/org/apache/calcite/adapter/file/CsvProjectTableScanRule.java | Adjusts projection pushdown mapping through existing scan.fields and adds a condition guard. |
| file/src/main/java/org/apache/calcite/adapter/file/CsvProjectFilterTableScanRule.java | New rule to push filter into scan and remap input refs for project/filter when combined. |
| file/src/main/java/org/apache/calcite/adapter/file/CsvFilterTableScanRule.java | New rule to push LogicalFilter condition into CsvTableScan. |
| file/src/main/java/org/apache/calcite/adapter/file/CsvEnumerator.java | Makes converter reusable, adds safer field access, adds objectsEqual, and modifies filter evaluation loop. |
| example/csv/src/test/java/org/apache/calcite/test/CsvTest.java | Adds example tests validating equality semantics with nulls under filterable model. |
Comments suppressed due to low confidence (1)
file/src/main/java/org/apache/calcite/adapter/file/CsvEnumerator.java:318
- Filtering uses strings[i] while iterating up to filterValues.size(). If a CSV row has fewer columns than the schema (which this PR now explicitly supports via field(strings, idx)), this will throw ArrayIndexOutOfBoundsException during filtering. Use the safe field(...) accessor (and treat missing fields as non-matching when a filter value is required).
if (filterValues != null) {
for (int i = 0; i < filterValues.size(); i++) {
String filterValue = filterValues.get(i);
if (filterValue != null) {
if (!filterValue.equals(strings[i])) {
continue outer;
}
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| RexToLixTranslator.translateCondition( | ||
| program, | ||
| implementor.getTypeFactory(), | ||
| builder, | ||
| inputGetter, | ||
| null, | ||
| implementor.getConformance()); |
| /** Rule that matches a {@link org.apache.calcite.rel.core.Filter} on | ||
| * a {@link CsvTableScan} and pushes arbitrary predicates into the scan. | ||
| * Any {@link org.apache.calcite.rex.RexNode} condition is compiled at plan | ||
| * time via {@link org.apache.calcite.adapter.enumerable.RexToLixTranslator} | ||
| * into a {@link org.apache.calcite.linq4j.function.Predicate1}. */ | ||
| public static final CsvFilterTableScanRule FILTER_SCAN = | ||
| CsvFilterTableScanRule.Config.DEFAULT.toRule(); | ||
|
|
||
| /** Rule that matches a {@link org.apache.calcite.rel.core.Project} on | ||
| * a {@link org.apache.calcite.rel.core.Filter} on a {@link CsvTableScan} | ||
| * and pushes down simple equality predicates. */ | ||
| public static final CsvProjectFilterTableScanRule PROJECT_FILTER_SCAN = | ||
| CsvProjectFilterTableScanRule.Config.DEFAULT.toRule(); |
| @Test void testNonPushableFilterRemains() { | ||
| // empno > 110 is a range filter; under the compiler-based filter pushdown | ||
| // it is pushed down into the scan, leaving only the projection on top. | ||
| final String sql = "select name from EMPS where empno > 110"; |
7b22d7a to
49b2744
Compare
49b2744 to
6a8ab33
Compare



Jira Link
[CALCITE-7618]
Changes Proposed
This PR implements filter pushdown support for the file adapter's CSV table using a planner-rule-based approach instead of a
FilterableTableinterface. This allows Calcite to make more intelligent planning decisions, estimate cost reductions, and display pushed-down predicates inEXPLAINplans.Implementation Details:
CsvFilterTableScanRulewhich matchesLogicalFilteron aCsvTableScanand pushes simple equality predicates (col = literal) into the scan.CsvProjectFilterTableScanRulewhich matchesLogicalProject→LogicalFilter→CsvTableScanand pushes down the filter first, preventing the planner from prematurely collapsing projects and filters into a genericEnumerableCalcand bypassing pushdown.CsvTableScanto store and propagate@Nullable String[] filterValues.CsvTableScan#computeSelfCostto reduce planning cost proportionally to the number of pushed-down filters.CsvTableScan#explainTermsto format filters asfilters=[[colIndex=value]]inEXPLAINoutputs.CsvTranslatableTable#scan(DataContext, int[], String[])which is dynamically invoked by the generated code when filters are present.CsvEnumerator#converterpackage-private so it can be reused insideCsvTranslatableTableto resolve correct row converters (ensuring single-column projections return raw objects rather thanObject[]arrays to prevent class cast errors).FileAdapterTest.javaverifying pushdown, projection combination, result correctness, and non-pushable residual filter persistence.testPushDownProjectAggregateWithFilterto reflect the newly optimized scan plans.To verify the change, run:
.\sqlline.bat -u "jdbc:calcite:model=file/src/test/resources/smart.json" -n admin -p admin -e "!set maxwidth 10000" -e "explain plan for select name, empno from EMPS where deptno = 20"
Before this change, the plan was:
PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[20], expr#4=[=($t2, $t3)], NAME=[$t1], EMPNO=[$t0], $condition=[$t4])
CsvTableScan(table=[[SALES, EMPS]], fields=[[0, 1, 2]])
After this change, the filter and projection are pushed down into CsvTableScan, resulting in:
CsvTableScan(table=[[SALES, EMPS]], fields=[[1, 0]], filters=[[2=20]])
This demonstrates that the scan now reads only the required columns (name, empno) and applies the deptno = 20 filter during the table scan itself.