Skip to content

fix(delta): drain all batches per scan file in DeltaInputSourceIterator#19592

Merged
abhishekrb19 merged 5 commits into
apache:masterfrom
rinchinov:fix/delta-lake-batch-iteration-18606
Jun 18, 2026
Merged

fix(delta): drain all batches per scan file in DeltaInputSourceIterator#19592
abhishekrb19 merged 5 commits into
apache:masterfrom
rinchinov:fix/delta-lake-batch-iteration-18606

Conversation

@rinchinov

Copy link
Copy Markdown
Contributor

Fixes #18606.

Description

Bug

DeltaInputSourceIterator.hasNext() used a local variable for the per-file
CloseableIterator<FilteredColumnarBatch>. When the method returned true after
finding the first non-empty batch of a file, that iterator went out of scope.
The next hasNext() call advanced to the next file, skipping all remaining
batches of the current file.

With the Delta kernel's default batch size of 1024 rows, this produced exactly
1024 × numFiles rows regardless of actual file size — matching the symptom
reported in #18606.

Fix

Promoted filteredBatchIterator to a class field (currentFileIterator).
hasNext() now drains all batches of the current file before advancing to the
next one. Also fixed close() to close currentFileIterator and drain all
remaining file iterators (the original only closed one).

Regression test

Added LargeRowGroupDeltaTable (2 Parquet files × 2000 rows = 4000 total) and
BatchDrainRegressionTests inside DeltaInputSourceTest.

  • Without the fix: 1024 × 2 = 2048 rows returned
  • With the fix: 4000 rows returned

Release note

Fixed a bug in the Delta Lake input source where only 1024 rows per Parquet file
were ingested. Ingestion tasks now return all rows from each file.


Key changed/added classes in this PR
  • DeltaInputSourceReader (fix)
  • LargeRowGroupDeltaTable (new test descriptor)
  • DeltaInputSourceTest (new BatchDrainRegressionTests inner class)
  • src/test/resources/large-row-group-table (new test Delta table)

This PR has:

  • been self-reviewed.
  • a release note entry in the PR description.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.

Fixes apache#18606 — only 1024 rows ingested per Parquet file when using the
Delta Lake input source.

Root cause: filteredBatchIterator was a local variable inside hasNext().
When the method returned true after the first non-empty batch of a file,
the iterator went out of scope. The next hasNext() call advanced to the
next file, skipping all remaining batches of the current file.

With Delta kernel's default batch size of 1024 rows, this caused exactly
1024 rows × N files to be ingested regardless of actual file size.

Fix: promote filteredBatchIterator to a field (currentFileIterator) so
it survives across hasNext() calls and all batches of a file are drained
before advancing to the next file.

Also fixed close() to properly close currentFileIterator and drain all
remaining file iterators.
Adds a Delta table with 2 Parquet files × 2000 rows (total 4000) where
each file exceeds the Delta kernel's default batch size of 1024 rows.

Without the fix: DeltaInputSourceIterator returns 1024 × 2 = 2048 rows.
With the fix:    all 4000 rows are returned correctly.

Test: DeltaInputSourceBatchDrainTest.testAllRowsReturnedWhenFilesExceedOneBatch
…apacheGH-18606

Adds LargeRowGroupDeltaTable (2 files × 2000 rows = 4000 total) and a
BatchDrainRegressionTests inner class inside DeltaInputSourceTest following
the same pattern as existing test classes.

The regression test fails with the bug (returns 1024 × 2 = 2048 rows)
and passes with the fix (returns all 4000 rows).

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1
Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1

Reviewed 9 of 9 changed files.


This is an automated review by Codex GPT-5.5

@rinchinov rinchinov requested a review from FrankChen021 June 17, 2026 15:14

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1

Reviewed 9 of 9 changed files.


This is an automated review by Codex GPT-5.5

if (!filteredColumnarBatchIterators.hasNext()) {
return false;
}
currentFileIterator = filteredColumnarBatchIterators.next();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Close exhausted file iterators before advancing

The latest commit only adds a newline, so this remains from the full PR diff: after draining currentFileIterator, hasNext() overwrites it with the next scan-file iterator without closing the exhausted one. These iterators come from Scan.transformPhysicalData(), whose close() closes the underlying Parquet physicalDataIter, so a fully read multi-file table leaks every completed file reader except the last one closed by DeltaInputSourceIterator.close(). Large Delta ingestions can therefore accumulate open Parquet readers/file handles. Close the exhausted currentFileIterator before assigning the next one, or otherwise ensure each per-file iterator is closed once it is drained.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for mentioning it, applied closing iterator

Each per-file iterator from Scan.transformPhysicalData() owns an
underlying Parquet reader/file handle. hasNext() overwrote
currentFileIterator with the next file without closing the exhausted
one, leaking a handle per completed file on multi-file tables (close()
only closed the last and the never-started iterators). Now close the
drained iterator before advancing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@rinchinov rinchinov requested a review from FrankChen021 June 18, 2026 13:30

@abhishekrb19 abhishekrb19 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks for the fix!

@abhishekrb19 abhishekrb19 merged commit 3228e3e into apache:master Jun 18, 2026
38 checks passed
@github-actions github-actions Bot added this to the 38.0.0 milestone Jun 18, 2026
@abhishekrb19

Copy link
Copy Markdown
Contributor

@rinchinov, if you’re interested in adding an embedded test for the Delta lake extension which is an end-to-end integration test, that would be a good addition. There’s one for the Iceberg extension here: https://github.com/apache/druid/tree/master/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg

We could try something similar for the delta-lake extension as well.

@rinchinov

Copy link
Copy Markdown
Contributor Author

Added the embedded end-to-end ingestion test for the Delta Lake input source as a separate (stacked) draft PR: #19611. It reuses the 4000-row regression table from this PR and is green only with the fix here, so it's kept as a draft until this one merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Druid Delta Table load Error - Unable to load full table, restricted to 1024 records per parquet file

3 participants