Skip to content

Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529

Open
neildsh wants to merge 34 commits intocitusdata:mainfrom
neildsh:sortedMerge
Open

Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529
neildsh wants to merge 34 commits intocitusdata:mainfrom
neildsh:sortedMerge

Conversation

@neildsh
Copy link
Copy Markdown

@neildsh neildsh commented Mar 23, 2026

Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries

Summary

This PR adds a sorted-merge execution path for multi-shard SELECT ... ORDER BY
queries and now also includes the streaming sorted-merge adapter work from
#8545.

Today, without this feature, Citus collects worker results and lets PostgreSQL
place a Sort node above Custom Scan (Citus Adaptive) to produce the final
global order. With sorted merge enabled, Citus pushes safe ORDER BY clauses to
workers, marks the CustomPath as already sorted via pathkeys, and k-way
merges the pre-sorted worker results on the coordinator using PostgreSQL's
binaryheap and SortSupport APIs. That lets PostgreSQL elide the coordinator
Sort node.

There are two hidden experimental GUCs:

GUC Default Context Purpose
citus.enable_sorted_merge off PGC_SUSET Plan-time eligibility gate for pushing ORDER BY to workers and using coordinator sorted merge
citus.enable_streaming_sorted_merge off PGC_USERSET When sorted merge is active, use a streaming adapter instead of eagerly copying merged tuples into a final tuplestore

The implementation follows the same sorted-input merge pattern as PostgreSQL's
MergeAppend.

Plan shape

Without sorted merge:

Sort
  -> Custom Scan (Citus Adaptive)
       -> worker results collected by the adaptive executor

With sorted merge, eager mode:

Custom Scan (Citus Adaptive)          -- pathkeys match root->sort_pathkeys
  -> per-task tuplestores             -- one store per task
     -> FinalizeSortedMerge()
        -> MergePerTaskStoresIntoFinalStore()
           -> scanState->tuplestorestate

With sorted merge, streaming mode:

Custom Scan (Citus Adaptive)          -- pathkeys match root->sort_pathkeys
  -> per-task tuplestores             -- one store per task
     -> FinalizeSortedMerge()
        -> SortedMergeAdapter
           -> ReturnTupleFromTuplestore()

The streaming adapter is still post-execution: RunDistributedExecution() fills
the per-task stores first, then FinalizeSortedMerge() attaches the adapter.
The streaming benefit is that the final merged tuplestore copy is skipped and a
parent LIMIT can stop pulling from the merge adapter early.

Current control flow

  1. Worker sort pushdown (multi_logical_optimizer.c)

    • WorkerSortClauseList() has an early path for
      citus.enable_sorted_merge=on.
    • It pushes ORDER BY to workers even without LIMIT when the sort is safe:
      no aggregate in the ORDER BY expression, only pushable window functions,
      and either no GROUP BY or a partition-compatible GROUP BY.
    • It also reports whether that worker sort is sorted-merge eligible.
  2. Logical / physical eligibility

    • WorkerExtendedOpNode() copies the logical
      queryOrderByLimit.sortedMergeEligible decision into the worker
      MultiExtendedOp.
    • UseSortedMerge() in multi_physical_planner.c walks the expected logical
      shape (MultiTreeRoot -> MasterExtendedOp -> MultiCollect -> WorkerExtendedOp) and sets only distributedPlan->useSortedMerge.
    • Merge key metadata is recomputed from the cached workerJob->jobQuery during finalization.
  3. Pathkeys / Sort elision (combine_query_planner.c)

    • CreateCitusCustomScanPath() sets
      path->custom_path.path.pathkeys = root->sort_pathkeys when
      distributedPlan->useSortedMerge is true.
    • PostgreSQL then sees the CustomScan output as already sorted and does
      not add a coordinator Sort node.
  4. Per-task result routing (adaptive_executor.c, sorted_merge.c)

    • When distributedPlan->useSortedMerge is true, AdaptiveExecutor() calls
      ClearPerTaskDispatchDests() at execution start to remove any stale
      execution-local destinations from a cached task list.
    • AssignPerTaskDispatchDests() creates one tuplestore per task and assigns
      task->tupleDest directly to a TupleStoreTupleDest for that store.
    • All per-task destinations share one TupleDestinationStats object so
      citus.max_intermediate_result_size is enforced across the aggregate
      per-task output, not independently per task.
    • The hot tuple path therefore uses the existing task-specific destination
      mechanism (task->tupleDest) instead of a separate hash-table dispatch
      destination.
  5. EXPLAIN ANALYZE path

    • ExplainAnalyzeTaskList() preserves the original task destination when it
      wraps tasks for EXPLAIN ANALYZE.
    • Query 0 data tuples are forwarded into the original per-task destination;
      query 1 plan-fetch tuples are consumed by the explain wrapper.
    • This means sorted merge remains active under EXPLAIN ANALYZE rather than
      falling back to the non-merge path.
  6. Sorted merge finalization (sorted_merge.c)

    • After remote and local tasks finish, AdaptiveExecutor() calls
      FinalizeSortedMerge() once.
    • FinalizeSortedMerge() rebuilds SortedMergeKey data from
      workerJob->jobQuery->sortClause and targetList.
    • In eager mode, it drains the heap merge into scanState->tuplestorestate
      and frees the per-task stores.
    • In streaming mode, it attaches a SortedMergeAdapter to
      scanState->mergeAdapter; the adapter owns and later frees the per-task
      stores.
    • AdaptiveExecutor() then calls ClearPerTaskDispatchDests() again so
      cached prepared plans cannot retain pointers into the executor-local memory
      context.
  7. Returning tuples (multi_executor.c)

    • ReturnTupleFromTuplestore() now goes through FetchNextScanTuple().
    • Without a merge adapter, it reads from scanState->tuplestorestate as
      before.
    • With a streaming adapter, it calls SortedMergeAdapterNext() and
      returns the winner slot
    • The existing quals/projection loop works with whichever slot
      FetchNextScanTuple() returns.

Backward scan and SCROLL cursors

The streaming adapter is forward-only.

To keep cursor behavior correct:

  1. FinalizePlan() omits CUSTOMPATH_SUPPORT_BACKWARD_SCAN when both
    distributedPlan->useSortedMerge and
    citus.enable_streaming_sorted_merge are active.
  2. CreateDistributedPlannedStmt() re-runs PostgreSQL's finished-plan
    materialization check after Citus replaces the plan tree. For SCROLL
    cursors, it wraps the plan in a Material node when the finished plan does
    not support backward scan.
  3. FetchNextScanTuple() still has a defensive user-facing error if a backward
    request reaches the forward-only streaming adapter.

Safety properties

  • Sorted-merge eligibility is plan-time only: the executor reads
    distributedPlan->useSortedMerge; it does not re-check
    citus.enable_sorted_merge.
  • Streaming mode is isolated: eager and streaming modes share the same
    merge implementation, and only differ in whether finalization drains into
    scanState->tuplestorestate or attaches scanState->mergeAdapter.
  • Cached plans are safe: task-specific tuple destinations are cleared at the
    start and end of sorted-merge execution.
  • Aggregate ORDER BY is excluded: ORDER BY expressions containing
    aggregates are not marked sorted-merge eligible because worker partial
    aggregate ordering does not necessarily match coordinator final aggregate
    ordering.
  • Intermediate result accounting is preserved: all per-task tuple
    destinations share one TupleDestinationStats budget.
  • EXPLAIN ANALYZE is supported: the explain wrapper forwards data tuples to
    the original task destination and consumes worker plan tuples separately.
  • Backward scan is safe: eager mode keeps the normal tuplestore behavior;
    streaming mode either gets a Material node for SCROLL cursors or reports a
    clear unsupported-backward-scan error.

Files changed

File Change
src/backend/distributed/executor/sorted_merge.c / src/include/distributed/sorted_merge.h Sorted merge key building, per-task destination assignment/clearing, eager merge, streaming adapter, and FinalizeSortedMerge()
src/backend/distributed/executor/adaptive_executor.c Sets per-task destinations for sorted merge, executes tasks, calls FinalizeSortedMerge(), and clears task destinations
src/backend/distributed/executor/multi_executor.c Fetches from either the final tuplestore or streaming adapter; returns adapter-owned slots in streaming mode
src/include/distributed/citus_custom_scan.h and citus_custom_scan.c Adds and cleans up scanState->mergeAdapter; supports rescan
src/backend/distributed/executor/tuple_destination.c / src/include/distributed/tuple_destination.h Adds CreateTupleStoreTupleDestWithStats() and exports EnsureIntermediateSizeLimitNotExceeded()
src/backend/distributed/planner/multi_logical_optimizer.c / src/include/distributed/multi_logical_planner.h Pushes worker sorts for eligible ORDER BY queries and carries sortedMergeEligible on MultiExtendedOp
src/backend/distributed/planner/multi_physical_planner.c Sets distributedPlan->useSortedMerge via UseSortedMerge()
src/backend/distributed/planner/combine_query_planner.c Sets CustomPath pathkeys for sorted-merge plans
src/backend/distributed/planner/distributed_planner.c Handles streaming adapter backward-scan flags and SCROLL cursor materialization
src/backend/distributed/shared_library_init.c, multi_executor.c, multi_executor.h Registers and stores the sorted-merge GUCs
src/backend/distributed/utils/citus_copyfuncs.c, citus_outfuncs.c Serializes/copies useSortedMerge
src/test/regress/sql/setup_multi_orderby_pushdown.sql.include Shared sorted-merge test setup
src/test/regress/sql/multi_orderby_pushdown.sql.include Core sorted-merge correctness and plan-shape test suite
src/test/regress/sql/multi_orderby_pushdown_streaming.sql Runs the suite in eager and streaming modes
src/test/regress/expected/multi_orderby_pushdown_streaming.out Expected output for eager + streaming coverage

Test coverage

The regression coverage now includes:

  • GUC basics for citus.enable_sorted_merge.
  • Plan-shape checks showing worker ORDER BY pushdown and coordinator Sort
    elision.
  • Ineligibility checks for ORDER BY aggregates and unsupported GROUP BY shapes.
  • Result comparisons with sorted merge off vs on.
  • Multi-column, mixed-direction, NULL ordering, ordinal ORDER BY, expressions,
    stable/volatile functions, DISTINCT / DISTINCT ON, OFFSET, subqueries, CTEs,
    joins, UNION ALL, EXISTS, and IN subquery patterns.
  • Plan-cache behavior when the sorted-merge GUC is toggled after PREPARE.
  • Cursor and SCROLL cursor behavior, including streaming adapter
    forward-only handling.
  • EXPLAIN ANALYZE and auto_explain paths.
  • Small work_mem, subplan, and citus.max_intermediate_result_size
    interactions.
  • Distributed transaction scenarios with INSERT/UPDATE/DELETE followed by
    sorted-merge SELECTs.
  • Index-backed ORDER BY cases where workers can avoid an explicit Sort.
  • A streaming test wrapper that runs the same sorted-merge suite once in eager
    mode and once with citus.enable_streaming_sorted_merge=on.

Validation

  • check-multi and check-mx-multi pass with the GUCs ON by default (only observed differences are expected in the plan output)
  • check-style: pass
  • CI lint scripts: pass

@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 24, 2026

Codecov Report

❌ Patch coverage is 85.77778% with 32 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.90%. Comparing base (42edfe8) to head (e3fae78).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #8529      +/-   ##
==========================================
+ Coverage   88.88%   88.90%   +0.01%     
==========================================
  Files         286      287       +1     
  Lines       63763    63898     +135     
  Branches     8017     8050      +33     
==========================================
+ Hits        56678    56806     +128     
  Misses       4776     4776              
- Partials     2309     2316       +7     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread src/test/regress/multi_schedule Outdated
Comment thread src/test/regress/sql/multi_orderby_pushdown.sql.include
Comment thread src/test/regress/sql/multi_orderby_pushdown.sql.include
Comment thread src/backend/distributed/planner/multi_logical_optimizer.c
Comment thread src/test/regress/expected/multi_orderby_pushdown.out Outdated
Comment thread src/test/regress/expected/multi_orderby_pushdown.out
Comment thread src/test/regress/sql/multi_orderby_pushdown.sql.include
Comment thread src/backend/distributed/planner/multi_physical_planner.c Outdated
neildsh added 15 commits April 14, 2026 20:03
Phase 1 of the sorted-merge feature. This commit adds the data  structures and GUC needed by later phases, with zero behavioral changes:
- SortedMergeKey typedef in multi_physical_planner.h describing one
  sort key for the coordinator k-way merge
- useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on
  DistributedPlan (plan-time decision, never checked at runtime via GUC)
- sortedMergeEligible field on MultiExtendedOp (logical optimizer tag
  read by the physical planner)
- Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off,
  GUC_NO_SHOW_ALL) consulted only during planning
- Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c
  for all new fields

All new fields default to false/0/NULL. Existing regression tests are
unaffected.

Co-authored-by: Copilot
Phase 2 of the sorted-merge feature. Workers now sort their results
when citus.enable_sorted_merge is enabled at planning time, even for
queries without LIMIT. The plan metadata is populated so later phases
can execute the merge and set pathkeys.

Logical optimizer changes (multi_logical_optimizer.c):
- WorkerSortClauseList() gains an early-return path that pushes the
  sort clause to workers when the GUC is on and the sort is safe
  (no aggregates in ORDER BY, no non-pushable window functions,
  and either no GROUP BY or GROUP BY on partition column).
- WorkerExtendedOpNode() sets sortedMergeEligible = true when the
  worker sort clause semantically matches the original sort clause,
  using the new SortClauseListsMatch() helper.
- SortClauseListsMatch() compares tleSortGroupRef, sortop,
  nulls_first, and eqop for each pair.

Physical planner changes (multi_physical_planner.c):
- CreatePhysicalDistributedPlan() finds the worker MultiExtendedOp
  with sortedMergeEligible = true, builds SortedMergeKey metadata
  from the worker job query, and sets useSortedMerge on the plan.
- BuildSortedMergeKeys() constructs the key array from the worker
  query's SortGroupClause list and target list.

The coordinator Sort node is still present above the CustomScan
(pathkeys not set yet — that is Phase 4). Results are correct
because the redundant Sort re-sorts already-sorted data.

Co-authored-by: Copilot
Phase 3 of the sorted-merge feature. When distributedPlan->useSortedMerge
is true (set at planning time by Phase 2), the adaptive executor now:
1. Routes worker results into per-task tuple stores via a new
   PerTaskDispatchTupleDest that dispatches by task->taskId hash lookup.
   No Task fields are mutated — all state lives on DistributedExecution.
2. After all tasks complete, performs a k-way merge of the per-task stores
   into the final scanState->tuplestorestate using PostgreSQL's public
   binaryheap and SortSupport APIs.
3. Frees per-task stores after the merge.

The existing CitusExecScan/ReturnTupleFromTuplestore/CitusEndScan/
CitusReScan code paths are completely unchanged — they read from
the final tuplestore exactly as before.

New files:
- sorted_merge.h: CreatePerTaskDispatchDest, MergePerTaskStoresIntoFinalStore
- sorted_merge.c: PerTaskDispatchTupleDest with taskId->index hash routing,
  MergePerTaskStoresIntoFinalStore with binaryheap merge, MergeHeapComparator
  modeled after PG's heap_compare_slots in nodeMergeAppend.c
Modified:
- adaptive_executor.c: DistributedExecution gains useSortedMerge/perTaskStores/
  perTaskStoreCount fields. AdaptiveExecutor() branches on useSortedMerge to
  create per-task stores, then merges post-execution. EXPLAIN ANALYZE falls
  back to existing single-tuplestore path.
Safety:
- Shared TupleDestinationStats preserves citus.max_intermediate_result_size
- Per-task stores allocated in AdaptiveExecutor local memory context
  (auto-cleanup on error via PG memory context teardown)
- task->totalReceivedTupleData tracking preserved

The coordinator Sort node is still present above the CustomScan (pathkeys
not set until Phase 4). Results are correct because the redundant Sort
re-sorts already-sorted data.
  Co-authored-by: Copilot
Phase 1 of the sorted-merge feature. This commit adds the data  structures and GUC needed by later phases, with zero behavioral changes:
- SortedMergeKey typedef in multi_physical_planner.h describing one
  sort key for the coordinator k-way merge
- useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on
  DistributedPlan (plan-time decision, never checked at runtime via GUC)
- sortedMergeEligible field on MultiExtendedOp (logical optimizer tag
  read by the physical planner)
- Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off,
  GUC_NO_SHOW_ALL) consulted only during planning
- Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c
  for all new fields

All new fields default to false/0/NULL. Existing regression tests are
unaffected.

Co-authored-by: Copilot
…n indication of sorted merge in the EXPLAIN output
Comment thread src/backend/distributed/executor/adaptive_executor.c
Comment thread src/backend/distributed/executor/sorted_merge.c Outdated
Comment thread src/backend/distributed/executor/sorted_merge.c Outdated
Comment thread src/backend/distributed/planner/multi_logical_optimizer.c
@neildsh neildsh mentioned this pull request May 4, 2026
@onurctirtir onurctirtir self-requested a review May 6, 2026 09:13
Copy link
Copy Markdown
Member

@onurctirtir onurctirtir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have any cases where non-streaming version performs faster than the streaming one, then I think we should always do a streaming merge when citus.enable_sorted_merge is enabled, and should remove citus.enable_streaming_sorted_merge. Do you think this makes sense?

Once we conclude above matter, I'll then provide deeper code review and would like to test the PR, so waiting atm. Btw, I had a chance to take a closer look into the PR and it's definitely going in the right direction, at the high level, thanks a lot!


Also, just to make sure if I'm making correct assumptions after yesterday's call - if we choose to always do streaming merge, do we have a way to "early" detect that backward scan is requested and we cannot fulfill it? "early" ideally means "in the planner", which I don't quite think possible, or in the executor. I meant, I'd even be okay if we fall back to the older behavior (where we do a final sort in the coordinating node) in the executor by emitting a notice, once / if we can detect that backwards scan is requested, even before trying to fetch the first tuple, which I'm not able to comment on either.

If none is possible, then yes, the current behavior with streaming merge is just fine, if we choose to drop "non-streaming" mode.


Also, I'd like to mention a few thing we can try before merging the PR, or occasionally during development:

  1. We should run the sorted merge tests by "disabling" the GUC in the test file. The expectation would be to have the same test output (except the SET .. GUC change) - this feels like a good way to validate the output.
  2. We should hardcode the default setting for the GUC to "ON" and should push maybe a new branch to the CI to see if rest of the test suite is still happy even while the GUC is turned ON. except any notice / debug messages added for the new feature.

Also, I didn't yet have a chance to check the regression tests added in this branch (although I'm seeing plenty of new ones, thanks!), but I'd still like to mention a few tests cases that we might want to add:

  • insert .. select .. order by .. limit
  • repartition joins
  • router (single shard) queries
  • tables with dropped columns (super important)
  • weird / interesting queries, such as:
    • select from dist order by column_a; -- yes, Postgres allows selecting 0 columns, so is Citus - make sure we don't crash
    • select .. from .. order by .. limit 0; -- empty task tuple stores
    • select .. from .. order by ..; -- on a distributed table that contains all its rows in a single shard

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants