3535import org .apache .paimon .manifest .PartitionEntry ;
3636import org .apache .paimon .options .Options ;
3737import org .apache .paimon .partition .Partition ;
38+ import org .apache .paimon .predicate .Equal ;
39+ import org .apache .paimon .predicate .In ;
40+ import org .apache .paimon .predicate .LeafPredicate ;
41+ import org .apache .paimon .predicate .LeafPredicateExtractor ;
3842import org .apache .paimon .predicate .Predicate ;
43+ import org .apache .paimon .predicate .PredicateBuilder ;
3944import org .apache .paimon .reader .RecordReader ;
4045import org .apache .paimon .table .FileStoreTable ;
4146import org .apache .paimon .table .ReadonlyTable ;
5661import org .apache .paimon .utils .InternalRowUtils ;
5762import org .apache .paimon .utils .IteratorRecordReader ;
5863import org .apache .paimon .utils .JsonSerdeUtil ;
64+ import org .apache .paimon .utils .PartitionPredicateHelper ;
5965import org .apache .paimon .utils .ProjectedRow ;
6066import org .apache .paimon .utils .SerializationUtils ;
6167
6268import org .apache .paimon .shade .guava30 .com .google .common .collect .Iterators ;
6369
70+ import javax .annotation .Nullable ;
71+
6472import java .io .IOException ;
6573import java .time .Instant ;
6674import java .time .LocalDateTime ;
6775import java .time .ZoneId ;
76+ import java .util .ArrayList ;
6877import java .util .Arrays ;
6978import java .util .Collections ;
7079import java .util .Comparator ;
@@ -140,32 +149,95 @@ public Table copy(Map<String, String> dynamicOptions) {
140149
141150 private static class PartitionsScan extends ReadOnceTableScan {
142151
152+ @ Nullable private LeafPredicate partitionPredicate ;
153+
143154 @ Override
144155 public InnerTableScan withFilter (Predicate predicate ) {
156+ if (predicate == null ) {
157+ return this ;
158+ }
159+
160+ Map <String , LeafPredicate > leafPredicates =
161+ predicate .visit (LeafPredicateExtractor .INSTANCE );
162+ this .partitionPredicate = leafPredicates .get ("partition" );
163+
164+ // Handle Or(Equal, Equal...) pattern from PredicateBuilder.in() with <=20 literals
165+ if (this .partitionPredicate == null ) {
166+ for (Predicate andChild : PredicateBuilder .splitAnd (predicate )) {
167+ LeafPredicate inPred = convertOrEqualsToIn (andChild , "partition" );
168+ if (inPred != null ) {
169+ this .partitionPredicate = inPred ;
170+ break ;
171+ }
172+ }
173+ }
145174 return this ;
146175 }
147176
177+ @ Nullable
178+ private static LeafPredicate convertOrEqualsToIn (Predicate predicate , String targetField ) {
179+ List <Predicate > orChildren = PredicateBuilder .splitOr (predicate );
180+ if (orChildren .size () <= 1 ) {
181+ return null ;
182+ }
183+ List <Object > literals = new ArrayList <>();
184+ String fieldName = null ;
185+ int fieldIndex = -1 ;
186+ DataType fieldType = null ;
187+ for (Predicate child : orChildren ) {
188+ if (!(child instanceof LeafPredicate )) {
189+ return null ;
190+ }
191+ LeafPredicate leaf = (LeafPredicate ) child ;
192+ if (!(leaf .function () instanceof Equal )) {
193+ return null ;
194+ }
195+ if (fieldName == null ) {
196+ fieldName = leaf .fieldName ();
197+ fieldIndex = leaf .index ();
198+ fieldType = leaf .type ();
199+ } else if (!fieldName .equals (leaf .fieldName ())) {
200+ return null ;
201+ }
202+ literals .addAll (leaf .literals ());
203+ }
204+ if (!targetField .equals (fieldName )) {
205+ return null ;
206+ }
207+ return new LeafPredicate (In .INSTANCE , fieldType , fieldIndex , fieldName , literals );
208+ }
209+
148210 @ Override
149211 public Plan innerPlan () {
150- return () -> Collections .singletonList (new PartitionsSplit ());
212+ return () -> Collections .singletonList (new PartitionsSplit (partitionPredicate ));
151213 }
152214 }
153215
154216 private static class PartitionsSplit extends SingletonSplit {
155217
156218 private static final long serialVersionUID = 1L ;
157219
220+ @ Nullable private final LeafPredicate partitionPredicate ;
221+
222+ private PartitionsSplit (@ Nullable LeafPredicate partitionPredicate ) {
223+ this .partitionPredicate = partitionPredicate ;
224+ }
225+
158226 @ Override
159227 public boolean equals (Object o ) {
160228 if (this == o ) {
161229 return true ;
162230 }
163- return o != null && getClass () == o .getClass ();
231+ if (o == null || getClass () != o .getClass ()) {
232+ return false ;
233+ }
234+ PartitionsSplit that = (PartitionsSplit ) o ;
235+ return Objects .equals (partitionPredicate , that .partitionPredicate );
164236 }
165237
166238 @ Override
167239 public int hashCode () {
168- return 1 ;
240+ return Objects . hash ( partitionPredicate ) ;
169241 }
170242
171243 @ Override
@@ -186,7 +258,7 @@ public PartitionsRead(FileStoreTable table) {
186258
187259 @ Override
188260 public InnerTableRead withFilter (Predicate predicate ) {
189- // TODO
261+ // filter pushdown is handled at the Scan layer through PartitionsSplit
190262 return this ;
191263 }
192264
@@ -207,7 +279,8 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
207279 throw new IllegalArgumentException ("Unsupported split: " + split .getClass ());
208280 }
209281
210- List <Partition > partitions = listPartitions ();
282+ PartitionsSplit partitionsSplit = (PartitionsSplit ) split ;
283+ List <Partition > partitions = listPartitions (partitionsSplit .partitionPredicate );
211284
212285 List <DataType > fieldTypes =
213286 fileStoreTable .schema ().logicalPartitionType ().getFieldTypes ();
@@ -320,17 +393,17 @@ private Timestamp toTimestamp(Long epochMillis) {
320393 Instant .ofEpochMilli (epochMillis ), ZoneId .systemDefault ()));
321394 }
322395
323- private List <Partition > listPartitions () {
396+ private List <Partition > listPartitions (@ Nullable LeafPredicate partitionPredicate ) {
324397 CatalogLoader catalogLoader = fileStoreTable .catalogEnvironment ().catalogLoader ();
325398 if (TimeTravelUtil .hasTimeTravelOptions (new Options (fileStoreTable .options ()))
326399 || catalogLoader == null ) {
327- return listPartitionEntries ();
400+ return listPartitionEntries (partitionPredicate );
328401 }
329402
330403 try (Catalog catalog = catalogLoader .load ()) {
331404 Identifier baseIdentifier = fileStoreTable .catalogEnvironment ().identifier ();
332405 if (baseIdentifier == null ) {
333- return listPartitionEntries ();
406+ return listPartitionEntries (partitionPredicate );
334407 }
335408 String branch = fileStoreTable .coreOptions ().branch ();
336409 Identifier identifier ;
@@ -343,15 +416,58 @@ private List<Partition> listPartitions() {
343416 } else {
344417 identifier = baseIdentifier ;
345418 }
346- return catalog .listPartitions (identifier );
419+ List <Partition > partitions = catalog .listPartitions (identifier );
420+ return filterByPredicate (partitions , partitionPredicate );
347421 } catch (Exception e ) {
348- return listPartitionEntries ();
422+ return listPartitionEntries (partitionPredicate );
423+ }
424+ }
425+
426+ private List <Partition > filterByPredicate (
427+ List <Partition > partitions , @ Nullable LeafPredicate partitionPredicate ) {
428+ if (partitionPredicate == null ) {
429+ return partitions ;
349430 }
431+ List <String > partitionKeys = fileStoreTable .partitionKeys ();
432+ String defaultPartitionName = fileStoreTable .coreOptions ().partitionDefaultName ();
433+ return partitions .stream ()
434+ .filter (
435+ p -> {
436+ StringBuilder sb = new StringBuilder ();
437+ for (int i = 0 ; i < partitionKeys .size (); i ++) {
438+ if (i > 0 ) {
439+ sb .append ("/" );
440+ }
441+ String value = p .spec ().get (partitionKeys .get (i ));
442+ sb .append (partitionKeys .get (i ))
443+ .append ("=" )
444+ .append (value == null ? defaultPartitionName : value );
445+ }
446+ return partitionPredicate .test (
447+ GenericRow .of (BinaryString .fromString (sb .toString ())));
448+ })
449+ .collect (Collectors .toList ());
350450 }
351451
352- private List <Partition > listPartitionEntries () {
353- List <PartitionEntry > partitionEntries =
354- fileStoreTable .newScan ().withLevelFilter (level -> true ).listPartitionEntries ();
452+ private List <Partition > listPartitionEntries (@ Nullable LeafPredicate partitionPredicate ) {
453+ InnerTableScan scan = fileStoreTable .newScan ().withLevelFilter (level -> true );
454+
455+ if (partitionPredicate != null ) {
456+ List <String > partitionKeys = fileStoreTable .partitionKeys ();
457+ RowType partitionType = fileStoreTable .schema ().logicalPartitionType ();
458+ String defaultPartitionName = fileStoreTable .coreOptions ().partitionDefaultName ();
459+ PartitionPredicateHelper .PushdownResult pushdownResult =
460+ PartitionPredicateHelper .buildPartitionPredicate (
461+ partitionPredicate ,
462+ partitionKeys ,
463+ partitionType ,
464+ defaultPartitionName );
465+ if (pushdownResult .predicate () != null ) {
466+ scan .withPartitionFilter (pushdownResult .predicate ());
467+ }
468+ }
469+
470+ List <PartitionEntry > partitionEntries = scan .listPartitionEntries ();
355471 RowType partitionType = fileStoreTable .schema ().logicalPartitionType ();
356472 String defaultPartitionName = fileStoreTable .coreOptions ().partitionDefaultName ();
357473 String [] partitionColumns = fileStoreTable .partitionKeys ().toArray (new String [0 ]);
0 commit comments