Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529
Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529neildsh wants to merge 34 commits intocitusdata:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #8529 +/- ##
==========================================
+ Coverage 88.88% 88.90% +0.01%
==========================================
Files 286 287 +1
Lines 63763 63898 +135
Branches 8017 8050 +33
==========================================
+ Hits 56678 56806 +128
Misses 4776 4776
- Partials 2309 2316 +7 🚀 New features to boost your workflow:
|
Phase 1 of the sorted-merge feature. This commit adds the data structures and GUC needed by later phases, with zero behavioral changes: - SortedMergeKey typedef in multi_physical_planner.h describing one sort key for the coordinator k-way merge - useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on DistributedPlan (plan-time decision, never checked at runtime via GUC) - sortedMergeEligible field on MultiExtendedOp (logical optimizer tag read by the physical planner) - Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off, GUC_NO_SHOW_ALL) consulted only during planning - Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c for all new fields All new fields default to false/0/NULL. Existing regression tests are unaffected. Co-authored-by: Copilot
Phase 2 of the sorted-merge feature. Workers now sort their results when citus.enable_sorted_merge is enabled at planning time, even for queries without LIMIT. The plan metadata is populated so later phases can execute the merge and set pathkeys. Logical optimizer changes (multi_logical_optimizer.c): - WorkerSortClauseList() gains an early-return path that pushes the sort clause to workers when the GUC is on and the sort is safe (no aggregates in ORDER BY, no non-pushable window functions, and either no GROUP BY or GROUP BY on partition column). - WorkerExtendedOpNode() sets sortedMergeEligible = true when the worker sort clause semantically matches the original sort clause, using the new SortClauseListsMatch() helper. - SortClauseListsMatch() compares tleSortGroupRef, sortop, nulls_first, and eqop for each pair. Physical planner changes (multi_physical_planner.c): - CreatePhysicalDistributedPlan() finds the worker MultiExtendedOp with sortedMergeEligible = true, builds SortedMergeKey metadata from the worker job query, and sets useSortedMerge on the plan. - BuildSortedMergeKeys() constructs the key array from the worker query's SortGroupClause list and target list. The coordinator Sort node is still present above the CustomScan (pathkeys not set yet — that is Phase 4). Results are correct because the redundant Sort re-sorts already-sorted data. Co-authored-by: Copilot
Phase 3 of the sorted-merge feature. When distributedPlan->useSortedMerge is true (set at planning time by Phase 2), the adaptive executor now: 1. Routes worker results into per-task tuple stores via a new PerTaskDispatchTupleDest that dispatches by task->taskId hash lookup. No Task fields are mutated — all state lives on DistributedExecution. 2. After all tasks complete, performs a k-way merge of the per-task stores into the final scanState->tuplestorestate using PostgreSQL's public binaryheap and SortSupport APIs. 3. Frees per-task stores after the merge. The existing CitusExecScan/ReturnTupleFromTuplestore/CitusEndScan/ CitusReScan code paths are completely unchanged — they read from the final tuplestore exactly as before. New files: - sorted_merge.h: CreatePerTaskDispatchDest, MergePerTaskStoresIntoFinalStore - sorted_merge.c: PerTaskDispatchTupleDest with taskId->index hash routing, MergePerTaskStoresIntoFinalStore with binaryheap merge, MergeHeapComparator modeled after PG's heap_compare_slots in nodeMergeAppend.c Modified: - adaptive_executor.c: DistributedExecution gains useSortedMerge/perTaskStores/ perTaskStoreCount fields. AdaptiveExecutor() branches on useSortedMerge to create per-task stores, then merges post-execution. EXPLAIN ANALYZE falls back to existing single-tuplestore path. Safety: - Shared TupleDestinationStats preserves citus.max_intermediate_result_size - Per-task stores allocated in AdaptiveExecutor local memory context (auto-cleanup on error via PG memory context teardown) - task->totalReceivedTupleData tracking preserved The coordinator Sort node is still present above the CustomScan (pathkeys not set until Phase 4). Results are correct because the redundant Sort re-sorts already-sorted data. Co-authored-by: Copilot
Phase 1 of the sorted-merge feature. This commit adds the data structures and GUC needed by later phases, with zero behavioral changes: - SortedMergeKey typedef in multi_physical_planner.h describing one sort key for the coordinator k-way merge - useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on DistributedPlan (plan-time decision, never checked at runtime via GUC) - sortedMergeEligible field on MultiExtendedOp (logical optimizer tag read by the physical planner) - Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off, GUC_NO_SHOW_ALL) consulted only during planning - Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c for all new fields All new fields default to false/0/NULL. Existing regression tests are unaffected. Co-authored-by: Copilot
…cases to impose a deterministic order
…n indication of sorted merge in the EXPLAIN output
…them at execution time using the job query.
There was a problem hiding this comment.
If we don't have any cases where non-streaming version performs faster than the streaming one, then I think we should always do a streaming merge when citus.enable_sorted_merge is enabled, and should remove citus.enable_streaming_sorted_merge. Do you think this makes sense?
Once we conclude above matter, I'll then provide deeper code review and would like to test the PR, so waiting atm. Btw, I had a chance to take a closer look into the PR and it's definitely going in the right direction, at the high level, thanks a lot!
Also, just to make sure if I'm making correct assumptions after yesterday's call - if we choose to always do streaming merge, do we have a way to "early" detect that backward scan is requested and we cannot fulfill it? "early" ideally means "in the planner", which I don't quite think possible, or in the executor. I meant, I'd even be okay if we fall back to the older behavior (where we do a final sort in the coordinating node) in the executor by emitting a notice, once / if we can detect that backwards scan is requested, even before trying to fetch the first tuple, which I'm not able to comment on either.
If none is possible, then yes, the current behavior with streaming merge is just fine, if we choose to drop "non-streaming" mode.
Also, I'd like to mention a few thing we can try before merging the PR, or occasionally during development:
- We should run the sorted merge tests by "disabling" the GUC in the test file. The expectation would be to have the same test output (except the SET .. GUC change) - this feels like a good way to validate the output.
- We should hardcode the default setting for the GUC to "ON" and should push maybe a new branch to the CI to see if rest of the test suite is still happy even while the GUC is turned ON. except any notice / debug messages added for the new feature.
Also, I didn't yet have a chance to check the regression tests added in this branch (although I'm seeing plenty of new ones, thanks!), but I'd still like to mention a few tests cases that we might want to add:
- insert .. select .. order by .. limit
- repartition joins
- router (single shard) queries
- tables with dropped columns (super important)
- weird / interesting queries, such as:
- select from dist order by column_a; -- yes, Postgres allows selecting 0 columns, so is Citus - make sure we don't crash
- select .. from .. order by .. limit 0; -- empty task tuple stores
- select .. from .. order by ..; -- on a distributed table that contains all its rows in a single shard
Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries
Summary
This PR adds a sorted-merge execution path for multi-shard
SELECT ... ORDER BYqueries and now also includes the streaming sorted-merge adapter work from
#8545.
Today, without this feature, Citus collects worker results and lets PostgreSQL
place a
Sortnode aboveCustom Scan (Citus Adaptive)to produce the finalglobal order. With sorted merge enabled, Citus pushes safe
ORDER BYclauses toworkers, marks the
CustomPathas already sorted viapathkeys, and k-waymerges the pre-sorted worker results on the coordinator using PostgreSQL's
binaryheapandSortSupportAPIs. That lets PostgreSQL elide the coordinatorSortnode.There are two hidden experimental GUCs:
citus.enable_sorted_mergeoffPGC_SUSETcitus.enable_streaming_sorted_mergeoffPGC_USERSETThe implementation follows the same sorted-input merge pattern as PostgreSQL's
MergeAppend.Plan shape
Without sorted merge:
With sorted merge, eager mode:
With sorted merge, streaming mode:
The streaming adapter is still post-execution:
RunDistributedExecution()fillsthe per-task stores first, then
FinalizeSortedMerge()attaches the adapter.The streaming benefit is that the final merged tuplestore copy is skipped and a
parent
LIMITcan stop pulling from the merge adapter early.Current control flow
Worker sort pushdown (
multi_logical_optimizer.c)WorkerSortClauseList()has an early path forcitus.enable_sorted_merge=on.ORDER BYto workers even withoutLIMITwhen the sort is safe:no aggregate in the ORDER BY expression, only pushable window functions,
and either no GROUP BY or a partition-compatible GROUP BY.
Logical / physical eligibility
WorkerExtendedOpNode()copies the logicalqueryOrderByLimit.sortedMergeEligibledecision into the workerMultiExtendedOp.UseSortedMerge()inmulti_physical_planner.cwalks the expected logicalshape (
MultiTreeRoot -> MasterExtendedOp -> MultiCollect -> WorkerExtendedOp) and sets onlydistributedPlan->useSortedMerge.workerJob->jobQueryduring finalization.Pathkeys / Sort elision (
combine_query_planner.c)CreateCitusCustomScanPath()setspath->custom_path.path.pathkeys = root->sort_pathkeyswhendistributedPlan->useSortedMergeis true.CustomScanoutput as already sorted and doesnot add a coordinator
Sortnode.Per-task result routing (
adaptive_executor.c,sorted_merge.c)distributedPlan->useSortedMergeis true,AdaptiveExecutor()callsClearPerTaskDispatchDests()at execution start to remove any staleexecution-local destinations from a cached task list.
AssignPerTaskDispatchDests()creates one tuplestore per task and assignstask->tupleDestdirectly to aTupleStoreTupleDestfor that store.TupleDestinationStatsobject socitus.max_intermediate_result_sizeis enforced across the aggregateper-task output, not independently per task.
mechanism (
task->tupleDest) instead of a separate hash-table dispatchdestination.
EXPLAIN ANALYZE path
ExplainAnalyzeTaskList()preserves the original task destination when itwraps tasks for EXPLAIN ANALYZE.
query 1 plan-fetch tuples are consumed by the explain wrapper.
falling back to the non-merge path.
Sorted merge finalization (
sorted_merge.c)AdaptiveExecutor()callsFinalizeSortedMerge()once.FinalizeSortedMerge()rebuildsSortedMergeKeydata fromworkerJob->jobQuery->sortClauseandtargetList.scanState->tuplestorestateand frees the per-task stores.
SortedMergeAdaptertoscanState->mergeAdapter; the adapter owns and later frees the per-taskstores.
AdaptiveExecutor()then callsClearPerTaskDispatchDests()again socached prepared plans cannot retain pointers into the executor-local memory
context.
Returning tuples (
multi_executor.c)ReturnTupleFromTuplestore()now goes throughFetchNextScanTuple().scanState->tuplestorestateasbefore.
SortedMergeAdapterNext()andreturns the winner slot
FetchNextScanTuple()returns.Backward scan and SCROLL cursors
The streaming adapter is forward-only.
To keep cursor behavior correct:
FinalizePlan()omitsCUSTOMPATH_SUPPORT_BACKWARD_SCANwhen bothdistributedPlan->useSortedMergeandcitus.enable_streaming_sorted_mergeare active.CreateDistributedPlannedStmt()re-runs PostgreSQL's finished-planmaterialization check after Citus replaces the plan tree. For
SCROLLcursors, it wraps the plan in a
Materialnode when the finished plan doesnot support backward scan.
FetchNextScanTuple()still has a defensive user-facing error if a backwardrequest reaches the forward-only streaming adapter.
Safety properties
distributedPlan->useSortedMerge; it does not re-checkcitus.enable_sorted_merge.merge implementation, and only differ in whether finalization drains into
scanState->tuplestorestateor attachesscanState->mergeAdapter.start and end of sorted-merge execution.
aggregates are not marked sorted-merge eligible because worker partial
aggregate ordering does not necessarily match coordinator final aggregate
ordering.
destinations share one
TupleDestinationStatsbudget.the original task destination and consumes worker plan tuples separately.
streaming mode either gets a
Materialnode for SCROLL cursors or reports aclear unsupported-backward-scan error.
Files changed
src/backend/distributed/executor/sorted_merge.c/src/include/distributed/sorted_merge.hFinalizeSortedMerge()src/backend/distributed/executor/adaptive_executor.cFinalizeSortedMerge(), and clears task destinationssrc/backend/distributed/executor/multi_executor.csrc/include/distributed/citus_custom_scan.handcitus_custom_scan.cscanState->mergeAdapter; supports rescansrc/backend/distributed/executor/tuple_destination.c/src/include/distributed/tuple_destination.hCreateTupleStoreTupleDestWithStats()and exportsEnsureIntermediateSizeLimitNotExceeded()src/backend/distributed/planner/multi_logical_optimizer.c/src/include/distributed/multi_logical_planner.hsortedMergeEligibleonMultiExtendedOpsrc/backend/distributed/planner/multi_physical_planner.cdistributedPlan->useSortedMergeviaUseSortedMerge()src/backend/distributed/planner/combine_query_planner.csrc/backend/distributed/planner/distributed_planner.csrc/backend/distributed/shared_library_init.c,multi_executor.c,multi_executor.hsrc/backend/distributed/utils/citus_copyfuncs.c,citus_outfuncs.cuseSortedMergesrc/test/regress/sql/setup_multi_orderby_pushdown.sql.includesrc/test/regress/sql/multi_orderby_pushdown.sql.includesrc/test/regress/sql/multi_orderby_pushdown_streaming.sqlsrc/test/regress/expected/multi_orderby_pushdown_streaming.outTest coverage
The regression coverage now includes:
citus.enable_sorted_merge.elision.
stable/volatile functions, DISTINCT / DISTINCT ON, OFFSET, subqueries, CTEs,
joins, UNION ALL, EXISTS, and IN subquery patterns.
forward-only handling.
work_mem, subplan, andcitus.max_intermediate_result_sizeinteractions.
sorted-merge SELECTs.
mode and once with
citus.enable_streaming_sorted_merge=on.Validation
check-multiandcheck-mx-multipass with the GUCs ON by default (only observed differences are expected in the plan output)check-style: pass