Skip to content

Fetch tuples in small batches in adaptive executor where possible#5195

Open
marcocitus wants to merge 4 commits intomainfrom
marcocitus/reentrant-executor
Open

Fetch tuples in small batches in adaptive executor where possible#5195
marcocitus wants to merge 4 commits intomainfrom
marcocitus/reentrant-executor

Conversation

@marcocitus
Copy link
Copy Markdown
Contributor

@marcocitus marcocitus commented Aug 20, 2021

DESCRIPTION: Fetch tuples in small batches in adaptive executor where possible

Motivation

Currently adaptive executor runs the entire distributed query to completion
on the first ExecScan call, buffering all rows before returning any to the
client. For large result sets this can mean:

  • Excessive memory / disk I/O: Spill to disk when result size exceeds
    work_mem, even when the client only needs a few rows (e.g. cursors, LIMIT).
  • Per-row libpq overhead: PQsetSingleRowMode() returns exactly
    one row per PQgetResult() call. A 200K-row result makes ~200K
    PQgetResult() calls, each allocating and freeing a PGresult.
  • Long time-to-first-row: The first row is not available until the
    entire result set has been fetched from all workers.

Batched Adaptive Executor

This PR makes RunDistributedExecution re-entrant: instead of
running to completion, it stops after collecting maxBatchSize rows into
the tuplestore, returns control to PostgreSQL, and resumes where it left
off when PostgreSQL asks for more rows.

Adaptive Executor entry points

The executor is split from a single AdaptiveExecutor() call into:

  • AdaptiveExecutorStart() — called once on the first ExecScan.
    Sets up connections, computes maxBatchSize, creates the WaitEventSet.
  • AdaptiveExecutorRun() — called repeatedly. Each call runs
    RunDistributedExecution to fill one batch and returns false (more
    rows available) or true (query complete).
  • AdaptiveExecutorEnd() — called on EndScan or early termination
    (cursor close, LIMIT). If the latter, cancels in-flight worker queries,
    drains pending results, and releases connections.

CitusExecScan reads from the tuplestore until it's empty, then calls
AdaptiveExecutorRun again for the next batch. Between batches the
tuplestore is cleared via tuplestore_clear().

Adaptive batch sizing

CalculateMaxBatchSize() sizes each batch to fit within work_mem. It
estimates the per-row memory footprint from the TupleDesc (fixed-width
attribute lengths, 128 bytes for unbounded varlena, typmod-based estimate
for bounded varlena) and divides work_mem by that estimate, clamped to
a floor of 100 and ceiling of 1M rows. The tuplestore should never spill
to disk during normal operation.

The GUC citus.executor_batch_size overrides the auto-calculation with
a fixed row count (default 0 = auto).

Chunked libpq mode (PG17+)

On PostgreSQL 17 and later, PQsetChunkedRowsMode(conn, chunkSize) is
used instead of PQsetSingleRowMode(). Each PQgetResult() returns up
to chunkSize rows (default 8192, configurable via
citus.executor_chunk_size). For a 200K-row result this is ~24
PQgetResult() calls instead of ~200K — roughly an 8,000x reduction in
libpq overhead.

On PG16, PQsetSingleRowMode() is still used, but the batching and
re-entrancy benefits still apply.

WaitEventSet preservation

The WaitEventSet is created once in AdaptiveExecutorStart and reused
across all batches of the same query. Previously, even in the single-shot
code path, the event set was created once per execution; now it simply
persists until AdaptiveExecutorEnd frees it.

Scrollable cursor support (Material node insertion)

The Citus CustomScan now does not advertise backward-scan capability
(EXEC_FLAG_BACKWARD), because the tuplestore is cleared between batches
and cannot support backward fetches across batch boundaries.

For SCROLL CURSOR queries, FinalizePlan() now calls PostgreSQL's
materialize_finished_plan() to wrap the Citus CustomScan in a
Material node. The Material node has its own tuplestore that
accumulates all rows across batches, supporting full backward scan. Since
Material is lazy (it doesn't force the child to run to completion
upfront), batching benefits are preserved: rows are pulled on demand.

Cursors declared without SCROLL work naturally — forward-only iteration
drains one batch at a time without materialization.

Early termination

If the consumer stops reading before the query completes (cursor close,
LIMIT satisfied), AdaptiveExecutorEnd() handles cleanup:

  1. Sends cancel requests to workers with in-progress queries
  2. Drains any pending PQgetResult() data so connections are clean
  3. Unclaims connections back to the connection pool

This prevents "another command is already in progress" errors on
subsequent statements in the same transaction.

Local execution

Local execution (queries routed to the coordinator's own shards) is not
batched — it runs to completion eagerly, same as before. Batching only
applies to remote worker connections.

What This PR Does Not Cover

  • Per-task tuplestores: All workers currently write into a single
    shared tuplestore. A future extension would give each task its own
    tuplestore, enabling k-way merge of sorted streams and streaming
    distributed aggregation.
  • Multi-row INSERT batching: Multi-row INSERT statements are not
    batched (they complete in one shot).

@codecov
Copy link
Copy Markdown

codecov Bot commented Aug 20, 2021

Codecov Report

❌ Patch coverage is 86.08696% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.77%. Comparing base (48e83f7) to head (de8d6d4).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5195      +/-   ##
==========================================
- Coverage   88.90%   88.77%   -0.14%     
==========================================
  Files         286      287       +1     
  Lines       63686    64123     +437     
  Branches     8010     8067      +57     
==========================================
+ Hits        56619    56924     +305     
- Misses       4761     4870     +109     
- Partials     2306     2329      +23     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

ResetExplainAnalyzeData(taskList);
MemoryContext memoryContext = AllocSetContextCreate(executorState->es_query_cxt,
"AdaptiveExecutor",
ALLOCSET_DEFAULT_SIZES);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: maybe reset this context at the very end since it uses es_query_ctx

Comment thread src/backend/distributed/executor/adaptive_executor.c Outdated
@DS-AdamMilazzo
Copy link
Copy Markdown

I hope this feature makes it in. :-)

@colm-mchugh colm-mchugh force-pushed the marcocitus/reentrant-executor branch from f33de24 to a8d14ee Compare April 7, 2026 17:05
@marcocitus
Copy link
Copy Markdown
Contributor Author

@microsoft-github-policy-service agree [company="Snowflake"]

Though this work was done during my Microsoft/Citus Data tenure, so already belongs to Microsoft.

@marcocitus
Copy link
Copy Markdown
Contributor Author

@microsoft-github-policy-service agree

@colm-mchugh colm-mchugh force-pushed the marcocitus/reentrant-executor branch 3 times, most recently from c140524 to 4200722 Compare April 9, 2026 09:31
@colm-mchugh colm-mchugh marked this pull request as ready for review April 9, 2026 10:05
@colm-mchugh colm-mchugh requested a review from tejeswarm April 9, 2026 10:05
@colm-mchugh colm-mchugh force-pushed the marcocitus/reentrant-executor branch 5 times, most recently from caaf155 to 9df3b85 Compare April 16, 2026 10:53
@colm-mchugh colm-mchugh force-pushed the marcocitus/reentrant-executor branch 6 times, most recently from 07bf04f to 8eb2656 Compare April 21, 2026 09:44
marcoslot and others added 2 commits April 28, 2026 12:14
…d execution, tests.

- Resource clean-up: AdaptiveExecutorEnd() releases sessions/connections when an error occurs between AdaptiveExecutorRun calls. Also handle early termination (cursor close, LIMIT satisfied) with proper clean-up of in-flight worker queries.

- ShouldRunTasksSequentially() check in FinishDistributedExecution() replaced with explicit sessionsCleanedUp flag on DistributedExecution struct. Fixes double CleanUpSessions on sequential path.

- Adaptive batch sizing via citus.executor_batch_size (default 0 => auto). Auto mode calculates batch size from work_mem and TupleDesc (attlen + typmod for varlena, 128B default for unbounded).  Floor 100, ceiling 1M rows.

- Remote execution uses LibPQ's chunked mode (PG17+), GUC configurable for now.

- Local execution is eager; it runs to completion.

- Regress test suite: 11 test cases covering batch sizes 1/10/100K/auto, empty results, LIMIT, aggregation, DML RETURNING, GUC behavior, between-batch error cleanup, cursor close mid-batch and cross-batch-size result consistency.
- The wait event set was getting re-created for every batch. Now it
is preserved across the entire distributed execution.

- Also remove distributed planner's marking of the Citus scan with
backward scan support. This requires FinalizePlan() to re-insert a
Material node if a scrollable cursor has been requested.

- Update backward scan tests to declare cursors as scrollable. This
had a knock-on impact on `multi_mx_alter_distributed_table` because
of cleanup added to `multi_mx_reference_table`. Also add `EXPLAIN
ANALYZE` tests.
@colm-mchugh colm-mchugh force-pushed the marcocitus/reentrant-executor branch from d9db2bd to f626550 Compare May 1, 2026 16:34
Comment thread src/backend/distributed/executor/adaptive_executor.c Outdated
Comment thread src/backend/distributed/executor/adaptive_executor.c Outdated
Comment thread src/backend/distributed/executor/adaptive_executor.c Outdated
Comment thread src/backend/distributed/executor/adaptive_executor.c
Comment thread src/backend/distributed/shared_library_init.c
@colm-mchugh colm-mchugh force-pushed the marcocitus/reentrant-executor branch from 0983333 to ec127da Compare May 5, 2026 09:08
@colm-mchugh colm-mchugh force-pushed the marcocitus/reentrant-executor branch from ec127da to de8d6d4 Compare May 5, 2026 09:30
Copy link
Copy Markdown
Contributor

@codeforall codeforall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants