[client] Implement BatchScanner with limit-based scan#515
Conversation
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Arnav-panjla Ty for the PR, left some comments, PTAL
|
@Arnav-panjla Do you need any help, will you be able to address comments in the review? |
|
@Arnav-panjla Let us know if you wish to continue working on this PR, or we can take it over to finish |
|
very sry , |
@Arnav-panjla No rush, I just need to understand that you wish to continue working on this. |
e43866c to
9ca191f
Compare
Implements a one-shot bounded BatchScanner backed by a single LimitScanRequest RPC (fixes apache#316): - adds TableScan::limit and create_batch_scanner - eager RPC with leader resolution (mirrors Lookuper) - Arrow IPC (log) and KV -> RecordBatch (PK) decoding - projection support Squashed from PR apache#515.
1c3290c to
a7b994f
Compare
|
@Arnav-panjla Ty for the changes, LGTM overall @charlesdong1991 @leekeiabstraction Can you take a look as well? |
a7b994f to
6521684
Compare
charlesdong1991
left a comment
There was a problem hiding this comment.
very nice pr! just a couple minor comments/questions
| /// Requires a previously-configured limit via [`Self::limit`]. Creation is | ||
| /// cheap; the `LimitScanRequest` runs on the first | ||
| /// [`LimitBatchScanner::next_batch`]. | ||
| pub fn create_bucket_batch_scanner( |
There was a problem hiding this comment.
should we validate arrow format and reject if not, similar to logscanner?
| ); | ||
| } | ||
|
|
||
| /// A bucket id outside the table's bucket range should be rejected by the |
There was a problem hiding this comment.
should it be table id based on test?
| let Some(pending) = self.pending.as_ref() else { | ||
| return Ok(None); | ||
| }; | ||
| let batch = run_limit_scan(pending, &self.bucket).await?; |
There was a problem hiding this comment.
so we will retry on every error regardless if it is retriable or not? i wonder if any issues will arise or not, can put a NOTE maybe for future if issues arise.
| } | ||
|
|
||
| /// Drains the scanner into all of its batches. | ||
| pub async fn collect_all_batches(&mut self) -> Result<Vec<ScanBatch>> { |
There was a problem hiding this comment.
i am curious the purpose of this function? length of result will be either 1 or 0 i guess for limit batch scanner...
so is it for other scanner types that Fluss has that will be implemented in future?
Purpose
Fixes #316
Implement one-shot
BatchScannerusingLimitScanRequest.Brief change log
limittoTableScancreate_batch_scannerBatchScanner(eager RPC, singlepoll_batch)RecordBatch(PK) decodingTests
cargo test)cargo check,clippy,fmtAPI and Format
API Change: Yes
BatchScanner,TableScan::limit,create_batch_scannerStorage Format: No
Documentation
No