IGNITE-14777 window functions support#12096
Conversation
9b522ec to
f04ed9f
Compare
f04ed9f to
dbf1e84
Compare
|
@oleg-zinovev, I've partially reviewed your PR. Review not completed yet, but I have some comments to publish. Also there are a lot of codestyle violations. Please read the https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines article about Ignite codestyle. Most of the problems can be detected automatically (for example using command:
|
| * - project removing constants | ||
| */ | ||
| @Value.Enclosing | ||
| public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { |
There was a problem hiding this comment.
It's a goal of AccumulatorsFactory in/out adapters to provide rows suitable for accumulator. Let's use this logic instead of creating addional rel ops.
There was a problem hiding this comment.
The implementation of aggregates in Ignite relies on the indices of values in the row instead of evaluating expressions in AggregateCall#rexList. In this case, constants must be present in the row before it is processed by the aggregate.
If we want to eliminate a separate rule that implements this using projections, I can suggest one of the following options:
-
Redesign AccumulatorsFactory so that it uses AggregateCall#rexList instead of AggregateCall#argList when computing the aggregate, and replace RexInputRef for window constants with RexLiteral in the WindowConverterRule.
-
Add projection of constants when computing window functions so that the row passed to accumulators contains window constants in the required positions. However, in this case, the implementation of WindowNode will include logic that already exists in ProjectionNode.
-
Implement a separate factory for aggregates (or add new methods in the current one) that works directly with RexWinAggCall. In this case, AccumulatorsFactory (or the new factory for window functions) will need to handle a set of RexNode operands, but this will not affect the current logic for regular AggregateCall. Also, this would allow abandoning the transformation of RexWinAggCall into AggregateCall and, thus, avoid copying groups in WindowConverterRule. However, in this scenario, I might need to change the visibility of classes in the package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.
Let me know if you'd like to propose a more robust solution.
There was a problem hiding this comment.
First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.
There was a problem hiding this comment.
@alex-plekhanov
Hi,
Could you please take a look at commit e9f9a37?
I removed projection execution as a separate step during query planning (ProjectWindowConstantsRule). Projections are now applied directly during window function evaluation.
However, I have a couple of concerns:
-
The number of projections will now depend on how many window functions are invoked
-
Re-scanning a window partition will require recomputing the projections
I think I can address the first issue by refactoring WindowFunctionFactory so that a single projection is shared across all functions within a group.
But I’m not sure what the best approach is for the second one, other than introducing a cache of already projected rows—which would increase memory usage during execution.
There was a problem hiding this comment.
As far as I understand now each calculation of each aggregate call will apply project, which will create the new row and copy input row to this row. In this case it's even more resource consuming than ProjectWindowConstantsRule. Maybe it's better to revert to ProjectWindowConstantsRule.
In my opinion right way to implement this is to provide some list of precompiled functions (columns projections List<Function<Row, T>>) to AbstractWindowFunction (and AbstractAccumulator) and implement get method in these classes like:
(T)projects.get(arguments().get(idx)).apply(row)
instead of:
(T)hnd.get(arguments().get(idx), row)
In this case no additional rows will be created and only required column modifications will be applied.
But let's do it later, by another ticket, this ticket is already too complex.
There was a problem hiding this comment.
Hi,
The exact same idea came to my mind about an hour after the commit... :)
I agree, this approach looks like the better one, although it will require some rework in the accumulators.
There was a problem hiding this comment.
Let's revert current implementation and return ProjectWindowConstantsRule back. Optimization with list of precompiled functions can be made by another ticket.
There was a problem hiding this comment.
I brought back ProjectWindowConstantsRule (renaming it to WindowConstantsRule).
At the same time, I reworked its logic:
- If an aggregation function call inside LogicalWindow contains a reference to a constant, projections will still be added as before.
- Otherwise, only references in window bounds will be replaced with constant values.
This way, projections will be added only for cases with constant parameters in aggregation function calls. This avoids adding extra projections when the function depends only on input fields.
I also added a test that uses constants in PARTITION BY and ORDER BY to verify that these constants do not require reference replacement.
Calcite removes such grouping and sorting keys on its own (org.apache.calcite.rel.logical.LogicalWindow#addWindows)
| partition = partitionFactory.get(); | ||
| } | ||
| else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) { | ||
| partition.drainTo(rowFactory, outBuf); |
There was a problem hiding this comment.
- This operation can block the thread for a long time.
- Large amount of rows can be stored in outBuf, in worth case there will be 2x input rows count (in partition and in outBuf)
Consider pushing directly to downstream. Drain (and push) only requested amount and postpone next pushes until next request.
There was a problem hiding this comment.
Let's fix it with another ticket, it's not a blocker but complicate the implementation
| doPush(); | ||
| } | ||
| else | ||
| nodeMemoryTracker.onRowAdded(row); |
There was a problem hiding this comment.
Can be false positive memory limit exceed error. Tracker need to be reset not only on rewindInternal, but also when partition is fully drained.
Test required for new node (see MemoryQuotasIntegrationTest)
| Supplier<WindowPartition<Row>> partitionFactory, | ||
| RowHandler.RowFactory<Row> rowFactory | ||
| ) { | ||
| super(ctx, rowType, DFLT_ROW_OVERHEAD); |
There was a problem hiding this comment.
Memory tracker row overhead depends at least on count of aggregates (maybe also kinds of aggregates)
| register(SqlStdOperatorTable.BIT_XOR); | ||
|
|
||
| // Window specific operations | ||
| register(SqlStdOperatorTable.ROW_NUMBER); |
There was a problem hiding this comment.
Smoke test for each operand required in StdSqlOperatorsTest
This comment was marked as outdated.
This comment was marked as outdated.
dbf1e84 to
9f135fa
Compare
This comment was marked as outdated.
This comment was marked as outdated.
…g into window partition factory call, window exclusion validation
9f135fa to
13dd848
Compare
| else { | ||
| Row offsetRow = frame.get(idx); | ||
| Object val = get(0, offsetRow); | ||
| if (val == null) { |
There was a problem hiding this comment.
Based on the description of LAG/LEAD function (and other database behavior), we have to return a value even if it is NULL. A default value returns only in the case when a row does not exist.
statement ok
CREATE TABLE t_lag_lead(id INTEGER, val INTEGER);
statement ok
INSERT INTO t_lag_lead VALUES (1, 10), (2, NULL), (3, 30);
query IIII
SELECT id, val,
LAG(val, 1, 999) OVER (ORDER BY id),
LEAD(val, 1, 999) OVER (ORDER BY id)
FROM t_lag_lead
ORDER BY id;
----
1 10 999 NULL
2 NULL 10 30
3 30 NULL 999
There was a problem hiding this comment.
Hi.
Fixed except the following:
If the third argument of lag/lead is non-nullable, Calcite changes the return type of the function to non-nullable (org.apache.calcite.sql.fun.SqlLeadLagAggFunction#transformType).
Because of this, your example returns 0 instead of null.
Not entirely sure what the best way to handle this.
There was a problem hiding this comment.
Probably we have to rewrite (add CAST: LAG(val, 1, 999) <=> LAG(val, 1, CAST(999 AS INTEGER)) in the rule or something else; I didn't test it) to support expected behavior. But anyway it is not a crucial point; we can do it in the future.
If the fix is not trivial, just leave a comment (TODO that pointed out of the specific JIRA ticket).
The Ignite community can fix it in the future.
There was a problem hiding this comment.
If you write such a CAST directly in the query, nothing changes (org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion#syncAttributes receives two non-nullable data types).
At the same time, inferReturnType is called only during query validation and conversion from SqlNode to RelNode.
I added a custom implementation with overridden inferReturnType.
| ); | ||
|
|
||
| /** */ | ||
| Supplier<WindowPartition<Row>> windowPartitionFactory( |
There was a problem hiding this comment.
ExpressionFactory is mostly for RexNode to something convertion. windowPartitionFactory is not directly related to RexNode, so, maybe it's better to use partition factory constructor instead of new ExpressionFactory method.
|
@oleg-zinovev, sorry for delay with review. |
This comment was marked as outdated.
This comment was marked as outdated.
| F.asList( | ||
| F.asList(1), | ||
| F.asList(2), | ||
| F.asList(1), | ||
| F.asList(2), | ||
| F.asList(3), | ||
| F.asList(1) | ||
| ), |
There was a problem hiding this comment.
I don't like the idea of specifying expected results as parameters. Maybe it's better to describe cases inside executeWindow method? For example, declare method:
private void checkWindow(Window.Group grp, boolean streaming, Object[][] expRes) {
Move content of executeWindow to this method, and call it from executeWIndow like:
checkWindow(rowNumber(), true, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
checkWindow(rowNumber(), false, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
checkWindow(countRows(RexWindowBounds.UNBOUNDED_PRECEDING, RexWindowBounds.CURRENT_ROW), false,
new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
...
| * - project removing constants | ||
| */ | ||
| @Value.Enclosing | ||
| public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { |
There was a problem hiding this comment.
First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.
# Conflicts: # modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java # modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
1a3964c to
7285f91
Compare
|
@alex-plekhanov |
| @SuppressWarnings({"rawtypes", "unchecked"}) | ||
| class RelJson { | ||
| /** */ | ||
| /** */ |
There was a problem hiding this comment.
All these changes to comments with two spaces (/* */) should be reverted to comments with one space.
# Conflicts: # modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
|
@vldpyatkov I found your PR (#12815). I’d like to point out that our solutions can coexist, and I can try to integrate them if either of the PRs gets accepted. I think this could be a nice addition to my implementation :) |
|
@oleg-zinovev If your PR were to be merged finally, I do not think that mine would ever be necessary. |
| * - project removing constants | ||
| */ | ||
| @Value.Enclosing | ||
| public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { |
There was a problem hiding this comment.
Let's revert current implementation and return ProjectWindowConstantsRule back. Optimization with list of precompiled functions can be made by another ticket.
| RelCollation sortCollation = RelCollations.of( | ||
| TraitUtils.createFieldCollation(0, true), | ||
| TraitUtils.createFieldCollation(1, true) | ||
| ); |
There was a problem hiding this comment.
TraitUtils.createCollation(F.asList(0, 1))
| RelCollation derivedCollation = RelCollations.of( | ||
| TraitUtils.createFieldCollation(0, true) | ||
| ); |
There was a problem hiding this comment.
TraitUtils.createCollation(F.asList(0))
|
|
||
| assertPlan(sql, publicSchema, nodeOrAnyChild(isInstanceOf(IgniteSort.class) | ||
| .and(it -> it.collation().equals(sortCollation)) | ||
| .and(input(isInstanceOf(IgniteWindow.class) |
There was a problem hiding this comment.
Let's also check that IgniteWindow has another sorting below
There was a problem hiding this comment.
@alex-plekhanov
Hi,
Could you explain why this check is needed? If IgniteWindow does not contain the sort below, then the enforced IgniteSort will be created by org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention#enforce.
There was a problem hiding this comment.
To ensure that we correctly pass through the collation to child node.
For example, you can check this case by replacing the last line in IgniteWindow.passThroughTraits method:
return Pair.of(traits, ImmutableList.of(traits.replace(RelCollations.EMPTY)));
Window node will use correct collation, but without sort node underneath.
| ImmutableBitSet inputColls = ImmutableBitSet.range(input.getRowType().getFieldCount()); | ||
|
|
||
| List<Integer> newCollationColls = maxPrefix(requiredCollation.getKeys(), inputColls.asSet()); | ||
| List<RelFieldCollation> newCollationFields = requiredCollation.getFieldCollations() | ||
| .stream().filter(k -> newCollationColls.contains(k.getFieldIndex())).collect(Collectors.toList()); | ||
|
|
||
| RelCollation newCollation = RelCollations.of(newCollationFields); | ||
|
|
||
| traits = traits.replace(newCollation); |
There was a problem hiding this comment.
Maybe something like:
for (int i = 0; i < requiredCollation.getFieldCollations().size(); i++) {
if (requiredCollation.getFieldCollations().get(i).getFieldIndex() >= input.getRowType().getFieldCount()) {
traits = traits.replace(RelCollations.of(requiredCollation.getFieldCollations().subList(0, i)));
break;
}
}
I think it's simplier and easier to understand. Up to you.
…stantRule), fix LEAD/LAG nullability.
|
|
||
| assertPlan(sql, publicSchema, nodeOrAnyChild(isInstanceOf(IgniteSort.class) | ||
| .and(it -> it.collation().equals(sortCollation)) | ||
| .and(input(isInstanceOf(IgniteWindow.class) |
There was a problem hiding this comment.
To ensure that we correctly pass through the collation to child node.
For example, you can check this case by replacing the last line in IgniteWindow.passThroughTraits method:
return Pair.of(traits, ImmutableList.of(traits.replace(RelCollations.EMPTY)));
Window node will use correct collation, but without sort node underneath.
| */ | ||
| @Value.Enclosing | ||
| public class WindowConstantsRule extends RelRule<WindowConstantsRule.Config> implements TransformationRule { | ||
| /** */ |
There was a problem hiding this comment.
Codestyle: Redundant space in comments
|
|
||
| /** {@link SqlLeadLagAggFunction}, with enforced return type nullability. */ | ||
| public class SqlLeadLagFunction extends SqlLeadLagAggFunction { | ||
|
|
There was a problem hiding this comment.
Codestyle: Redundant line
| private Integer cachedEndOffset; | ||
|
|
||
| /** */ | ||
| /** */ |
There was a problem hiding this comment.
Codestyle: Redundant space in comment
| } | ||
|
|
||
| /** */ | ||
| /** */ |
There was a problem hiding this comment.
Codestyle: Redundant space in comment
|
|
||
| /** Collects constants, used in visited {@link RexNode}. */ | ||
| private static final class ConstantRefCollector extends RexShuttle { | ||
|
|
| return super.visitInputRef(inputRef); | ||
| } | ||
| } | ||
|
|
| List<RexNode> newOperands = call.getOperands().stream() | ||
| .map(arg -> arg.accept(visitor)) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
List<RexNode> newOperands = Commons.transform(call.getOperands(), arg -> arg.accept(visitor));
| public void testConstantsInPartitionByAndOrderBy() throws Exception { | ||
| String sql = "SELECT MAX(VALUE) OVER (PARTITION BY 1 ORDER BY 2) FROM AFFINITY_TBL"; | ||
|
|
||
| assertPlan(sql, publicSchema, nodeOrAnyChild(isInstanceOf(IgniteWindow.class) |
There was a problem hiding this comment.
Maybe also check that window has no projects underneath?
Thank you for submitting the pull request to the Apache Ignite.
In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:
The Contribution Checklist
The description explains WHAT and WHY was made instead of HOW.
The following pattern must be used:
IGNITE-XXXX Change summarywhereXXXX- number of JIRA issue.(see the Maintainers list)
the
green visaattached to the JIRA ticket (see TC.Bot: Check PR)Notes
If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.
Description of Changes:
Added support for planning and executing window functions.
Added special window functions: row_number, rank, dense_rank, percent_rank, cume_dist, ntile, nth_value, first_value, last_value, lag, lead.
Provides two modes of window function execution:
Supports specifying integer offsets in either variant (ROWS / RANGE) and time interval offsets for RANGE.
Window Planning:
During query planning, windows are split into separate rels for each group of aggregation functions. Each logical window rel includes a collation that is required for correctly partitioning rows and defining frames when computing the window.
Splitting is done using Calcite’s standard rule, which groups function calls based on the window specification (according to the OVER clause).
After that, constants used in the window are projected to support referencing them in the current implementation of Ignite aggregates. (If constants are used in FOLLOWING/PRECEDING, they are directly substituted into the offset, which helps reduce the number of frame boundary searches.)
An additional planning phase was introduced specifically for window planning. (I couldn't find a suitable existing place for the new rules, so I followed the approach used in Apache Drill.)
Separate Change:
During development, when attempting to upgrade to Calcite 1.39, it was discovered that IgniteTypeFactory#leastRestrictive does not take into account the nullability of the resulting type when merging FLOAT and DOUBLE.
P.S. I'll be appreciate to any feedback