Fetch tuples in small batches in adaptive executor where possible#5195
Open
marcocitus wants to merge 4 commits intomainfrom
Open
Fetch tuples in small batches in adaptive executor where possible#5195marcocitus wants to merge 4 commits intomainfrom
marcocitus wants to merge 4 commits intomainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5195 +/- ##
==========================================
- Coverage 88.90% 88.77% -0.14%
==========================================
Files 286 287 +1
Lines 63686 64123 +437
Branches 8010 8067 +57
==========================================
+ Hits 56619 56924 +305
- Misses 4761 4870 +109
- Partials 2306 2329 +23 🚀 New features to boost your workflow:
|
marcocitus
commented
Aug 20, 2021
| ResetExplainAnalyzeData(taskList); | ||
| MemoryContext memoryContext = AllocSetContextCreate(executorState->es_query_cxt, | ||
| "AdaptiveExecutor", | ||
| ALLOCSET_DEFAULT_SIZES); |
Contributor
Author
There was a problem hiding this comment.
TODO: maybe reset this context at the very end since it uses es_query_ctx
marcocitus
commented
Aug 20, 2021
|
I hope this feature makes it in. :-) |
f33de24 to
a8d14ee
Compare
Contributor
Author
|
@microsoft-github-policy-service agree [company="Snowflake"] Though this work was done during my Microsoft/Citus Data tenure, so already belongs to Microsoft. |
Contributor
Author
|
@microsoft-github-policy-service agree |
c140524 to
4200722
Compare
caaf155 to
9df3b85
Compare
07bf04f to
8eb2656
Compare
…d execution, tests. - Resource clean-up: AdaptiveExecutorEnd() releases sessions/connections when an error occurs between AdaptiveExecutorRun calls. Also handle early termination (cursor close, LIMIT satisfied) with proper clean-up of in-flight worker queries. - ShouldRunTasksSequentially() check in FinishDistributedExecution() replaced with explicit sessionsCleanedUp flag on DistributedExecution struct. Fixes double CleanUpSessions on sequential path. - Adaptive batch sizing via citus.executor_batch_size (default 0 => auto). Auto mode calculates batch size from work_mem and TupleDesc (attlen + typmod for varlena, 128B default for unbounded). Floor 100, ceiling 1M rows. - Remote execution uses LibPQ's chunked mode (PG17+), GUC configurable for now. - Local execution is eager; it runs to completion. - Regress test suite: 11 test cases covering batch sizes 1/10/100K/auto, empty results, LIMIT, aggregation, DML RETURNING, GUC behavior, between-batch error cleanup, cursor close mid-batch and cross-batch-size result consistency.
292f2e6 to
d9db2bd
Compare
- The wait event set was getting re-created for every batch. Now it is preserved across the entire distributed execution. - Also remove distributed planner's marking of the Citus scan with backward scan support. This requires FinalizePlan() to re-insert a Material node if a scrollable cursor has been requested. - Update backward scan tests to declare cursors as scrollable. This had a knock-on impact on `multi_mx_alter_distributed_table` because of cleanup added to `multi_mx_reference_table`. Also add `EXPLAIN ANALYZE` tests.
d9db2bd to
f626550
Compare
codeforall
reviewed
May 4, 2026
codeforall
reviewed
May 4, 2026
codeforall
reviewed
May 4, 2026
codeforall
reviewed
May 4, 2026
codeforall
reviewed
May 4, 2026
0983333 to
ec127da
Compare
ec127da to
de8d6d4
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
DESCRIPTION: Fetch tuples in small batches in adaptive executor where possible
Motivation
Currently adaptive executor runs the entire distributed query to completion
on the first
ExecScancall, buffering all rows before returning any to theclient. For large result sets this can mean:
work_mem, even when the client only needs a few rows (e.g. cursors,LIMIT).PQsetSingleRowMode()returns exactlyone row per
PQgetResult()call. A 200K-row result makes ~200KPQgetResult()calls, each allocating and freeing aPGresult.entire result set has been fetched from all workers.
Batched Adaptive Executor
This PR makes
RunDistributedExecutionre-entrant: instead ofrunning to completion, it stops after collecting
maxBatchSizerows intothe tuplestore, returns control to PostgreSQL, and resumes where it left
off when PostgreSQL asks for more rows.
Adaptive Executor entry points
The executor is split from a single
AdaptiveExecutor()call into:AdaptiveExecutorStart()— called once on the firstExecScan.Sets up connections, computes
maxBatchSize, creates theWaitEventSet.AdaptiveExecutorRun()— called repeatedly. Each call runsRunDistributedExecutionto fill one batch and returnsfalse(morerows available) or
true(query complete).AdaptiveExecutorEnd()— called onEndScanor early termination(cursor close,
LIMIT). If the latter, cancels in-flight worker queries,drains pending results, and releases connections.
CitusExecScanreads from the tuplestore until it's empty, then callsAdaptiveExecutorRunagain for the next batch. Between batches thetuplestore is cleared via
tuplestore_clear().Adaptive batch sizing
CalculateMaxBatchSize()sizes each batch to fit withinwork_mem. Itestimates the per-row memory footprint from the
TupleDesc(fixed-widthattribute lengths, 128 bytes for unbounded varlena, typmod-based estimate
for bounded varlena) and divides
work_memby that estimate, clamped toa floor of 100 and ceiling of 1M rows. The tuplestore should never spill
to disk during normal operation.
The GUC
citus.executor_batch_sizeoverrides the auto-calculation witha fixed row count (default 0 = auto).
Chunked libpq mode (PG17+)
On PostgreSQL 17 and later,
PQsetChunkedRowsMode(conn, chunkSize)isused instead of
PQsetSingleRowMode(). EachPQgetResult()returns upto
chunkSizerows (default 8192, configurable viacitus.executor_chunk_size). For a 200K-row result this is ~24PQgetResult()calls instead of ~200K — roughly an 8,000x reduction inlibpq overhead.
On PG16,
PQsetSingleRowMode()is still used, but the batching andre-entrancy benefits still apply.
WaitEventSet preservation
The
WaitEventSetis created once inAdaptiveExecutorStartand reusedacross all batches of the same query. Previously, even in the single-shot
code path, the event set was created once per execution; now it simply
persists until
AdaptiveExecutorEndfrees it.Scrollable cursor support (Material node insertion)
The Citus
CustomScannow does not advertise backward-scan capability(
EXEC_FLAG_BACKWARD), because the tuplestore is cleared between batchesand cannot support backward fetches across batch boundaries.
For
SCROLL CURSORqueries,FinalizePlan()now calls PostgreSQL'smaterialize_finished_plan()to wrap the CitusCustomScanin aMaterialnode. TheMaterialnode has its own tuplestore thataccumulates all rows across batches, supporting full backward scan. Since
Materialis lazy (it doesn't force the child to run to completionupfront), batching benefits are preserved: rows are pulled on demand.
Cursors declared without
SCROLLwork naturally — forward-only iterationdrains one batch at a time without materialization.
Early termination
If the consumer stops reading before the query completes (cursor close,
LIMITsatisfied),AdaptiveExecutorEnd()handles cleanup:PQgetResult()data so connections are cleanThis prevents "another command is already in progress" errors on
subsequent statements in the same transaction.
Local execution
Local execution (queries routed to the coordinator's own shards) is not
batched — it runs to completion eagerly, same as before. Batching only
applies to remote worker connections.
What This PR Does Not Cover
shared tuplestore. A future extension would give each task its own
tuplestore, enabling k-way merge of sorted streams and streaming
distributed aggregation.
INSERTstatements are notbatched (they complete in one shot).