fix(delta): drain all batches per scan file in DeltaInputSourceIterator#19592
Conversation
Fixes apache#18606 — only 1024 rows ingested per Parquet file when using the Delta Lake input source. Root cause: filteredBatchIterator was a local variable inside hasNext(). When the method returned true after the first non-empty batch of a file, the iterator went out of scope. The next hasNext() call advanced to the next file, skipping all remaining batches of the current file. With Delta kernel's default batch size of 1024 rows, this caused exactly 1024 rows × N files to be ingested regardless of actual file size. Fix: promote filteredBatchIterator to a field (currentFileIterator) so it survives across hasNext() calls and all batches of a file are drained before advancing to the next file. Also fixed close() to properly close currentFileIterator and drain all remaining file iterators.
Adds a Delta table with 2 Parquet files × 2000 rows (total 4000) where each file exceeds the Delta kernel's default batch size of 1024 rows. Without the fix: DeltaInputSourceIterator returns 1024 × 2 = 2048 rows. With the fix: all 4000 rows are returned correctly. Test: DeltaInputSourceBatchDrainTest.testAllRowsReturnedWhenFilesExceedOneBatch
…apacheGH-18606 Adds LargeRowGroupDeltaTable (2 files × 2000 rows = 4000 total) and a BatchDrainRegressionTests inner class inside DeltaInputSourceTest following the same pattern as existing test classes. The regression test fails with the bug (returns 1024 × 2 = 2048 rows) and passes with the fix (returns all 4000 rows).
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
Reviewed 9 of 9 changed files.
This is an automated review by Codex GPT-5.5
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
Reviewed 9 of 9 changed files.
This is an automated review by Codex GPT-5.5
| if (!filteredColumnarBatchIterators.hasNext()) { | ||
| return false; | ||
| } | ||
| currentFileIterator = filteredColumnarBatchIterators.next(); |
There was a problem hiding this comment.
[P1] Close exhausted file iterators before advancing
The latest commit only adds a newline, so this remains from the full PR diff: after draining currentFileIterator, hasNext() overwrites it with the next scan-file iterator without closing the exhausted one. These iterators come from Scan.transformPhysicalData(), whose close() closes the underlying Parquet physicalDataIter, so a fully read multi-file table leaks every completed file reader except the last one closed by DeltaInputSourceIterator.close(). Large Delta ingestions can therefore accumulate open Parquet readers/file handles. Close the exhausted currentFileIterator before assigning the next one, or otherwise ensure each per-file iterator is closed once it is drained.
There was a problem hiding this comment.
thanks for mentioning it, applied closing iterator
Each per-file iterator from Scan.transformPhysicalData() owns an underlying Parquet reader/file handle. hasNext() overwrote currentFileIterator with the next file without closing the exhausted one, leaking a handle per completed file on multi-file tables (close() only closed the last and the never-started iterators). Now close the drained iterator before advancing. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
abhishekrb19
left a comment
There was a problem hiding this comment.
Good catch, thanks for the fix!
|
@rinchinov, if you’re interested in adding an embedded test for the Delta lake extension which is an end-to-end integration test, that would be a good addition. There’s one for the Iceberg extension here: https://github.com/apache/druid/tree/master/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg We could try something similar for the delta-lake extension as well. |
|
Added the embedded end-to-end ingestion test for the Delta Lake input source as a separate (stacked) draft PR: #19611. It reuses the 4000-row regression table from this PR and is green only with the fix here, so it's kept as a draft until this one merges. |
Fixes #18606.
Description
Bug
DeltaInputSourceIterator.hasNext()used a local variable for the per-fileCloseableIterator<FilteredColumnarBatch>. When the method returnedtrueafterfinding the first non-empty batch of a file, that iterator went out of scope.
The next
hasNext()call advanced to the next file, skipping all remainingbatches of the current file.
With the Delta kernel's default batch size of 1024 rows, this produced exactly
1024 × numFilesrows regardless of actual file size — matching the symptomreported in #18606.
Fix
Promoted
filteredBatchIteratorto a class field (currentFileIterator).hasNext()now drains all batches of the current file before advancing to thenext one. Also fixed
close()to closecurrentFileIteratorand drain allremaining file iterators (the original only closed one).
Regression test
Added
LargeRowGroupDeltaTable(2 Parquet files × 2000 rows = 4000 total) andBatchDrainRegressionTestsinsideDeltaInputSourceTest.1024 × 2 = 2048rows returned4000rows returnedRelease note
Fixed a bug in the Delta Lake input source where only 1024 rows per Parquet file
were ingested. Ingestion tasks now return all rows from each file.
Key changed/added classes in this PR
DeltaInputSourceReader(fix)LargeRowGroupDeltaTable(new test descriptor)DeltaInputSourceTest(newBatchDrainRegressionTestsinner class)src/test/resources/large-row-group-table(new test Delta table)This PR has: