Skip to content

[Proposal]: Bounded-memory GroupBy aggregation (streaming merge + streaming result) #2472

Description

@MrHDOLEK

Describe the Proposal

Aggregation is a blocking stage, so GroupBy must hold per-group state until all input is consumed. Today that footprint is unbounded in two places:

  1. GroupBy::result() materializes the entire result as a single Rows (one Row/Entry/Definition/Type graph per distinct group).
  2. Even with the configurable spill storage from Storage for Aggregating Functions #1390, the MEMORY_FALLBACK_EXTERNAL backend does not bound peak memory: ExternalAggregationStorage::all() reloads every spilled bucket into one in-memory $merged map, and result() then builds a second full Rows. At high cardinality the "spill" backend therefore uses more peak memory than the in-memory one (measured with groupBy('email')->aggregate(count(), sum()): filesystem ≈ 432 MB vs memory ≈ 305 MB at ~98k groups; ~1.1 GB at 300k). Spilling bounds ingestion, then all()/result() un-bound it.

Goal: make GroupBy aggregation bounded end-to-end on the external path — O(batchSize + bucketCount) instead of O(distinctGroups) — and drop the redundant full-result copy for every backend. This is what makes MEMORY_FALLBACK_EXTERNAL actually deliver scaling beyond RAM. It also aligns GroupBy with Flow's streaming contract: it is currently the only operator that emits one giant batch instead of a stream of Rows.

Two coupled changes — neither alone fixes the filesystem path (one removes $merged, the other removes the result Rows; both are required):

  • Streaming k-way merge (storage): spill buckets sorted by group hash (one record per entry), then merge them with a min-heap, yielding one fully-merged group at a time — no $merged map.
  • Streaming result (core): GroupBy emits the result in batches instead of one Rows; the downstream pipeline already consumes per-Rows incrementally, so this flows through to file/stream loaders unchanged.

API Adjustments

No user-facing DataFrame/DSL API change — internal behavior only; existing groupBy() / aggregate() usage is unchanged.

Internal:

  • GroupBucketsCache: get(): arraystream(string): Generator<string, GroupEntry>; spill format becomes one sorted record per entry.
  • ExternalAggregationStorage::all(): streaming k-way merge (reuses the SplMinHeap pattern from ExternalSort).
  • GroupBy: add resultStream(FlowContext, int $batchSize): Generator<Rows>; GroupByProcessor does yield from. result() kept for the pivot path / fetch() / BC. Batch size from existing config.

Semantics decisions needed (the reason this touches core):

  • Output ordering: the memory backend yields groups in insertion order; the external merge yields them sorted by group hash. GroupBy does not currently guarantee an order — needs a contract decision.
  • to_output table mode: StreamLoader closes the output stream after the first batch and AsciiTableFormatter prints a header per batch, so multi-batch groupBy output breaks interactive table rendering. Requires fixing StreamLoader (write the header once, don't close mid-stream) or special-casing. Main cross-cutting change.
  • fetch() unaffected (materializes by design); pivot stays materialized (small, separate path).

Are you intending to also work on proposed change?

Yes

Are you interested in sponsoring this change?

No

Integration & Dependencies

No new dependencies. Reuses existing primitives (SplMinHeap as in ExternalSort, the configured Serializer, Filesystem).

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No fields configured for Bug.

Projects

Status
Todo

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions