Skip to content

Commit 1116648

Browse files
sundapengclaude
andcommitted
[core] Support partition predicate pushdown for PartitionsTable
PartitionsTable.withFilter() was a no-op (TODO), causing full manifest scans when querying with partition filters. This adds predicate pushdown following the same pattern as BucketsTable (#7592) and FilesTable (#7376). Key changes: - PartitionsScan extracts partition predicate via LeafPredicateExtractor - PartitionsSplit carries the predicate to PartitionsRead - Catalog path: in-memory filter preserving metadata columns - TableScan path: manifest-level pushdown via withPartitionFilter - PartitionPredicateHelper refactored to build+apply two-step pattern - parsePartitionSpec extended for key=value/key=value format Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 22baf60 commit 1116648

3 files changed

Lines changed: 331 additions & 50 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java

Lines changed: 124 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@
3535
import org.apache.paimon.manifest.PartitionEntry;
3636
import org.apache.paimon.options.Options;
3737
import org.apache.paimon.partition.Partition;
38+
import org.apache.paimon.predicate.Equal;
39+
import org.apache.paimon.predicate.In;
40+
import org.apache.paimon.predicate.LeafPredicate;
41+
import org.apache.paimon.predicate.LeafPredicateExtractor;
3842
import org.apache.paimon.predicate.Predicate;
43+
import org.apache.paimon.predicate.PredicateBuilder;
3944
import org.apache.paimon.reader.RecordReader;
4045
import org.apache.paimon.table.FileStoreTable;
4146
import org.apache.paimon.table.ReadonlyTable;
@@ -56,15 +61,19 @@
5661
import org.apache.paimon.utils.InternalRowUtils;
5762
import org.apache.paimon.utils.IteratorRecordReader;
5863
import org.apache.paimon.utils.JsonSerdeUtil;
64+
import org.apache.paimon.utils.PartitionPredicateHelper;
5965
import org.apache.paimon.utils.ProjectedRow;
6066
import org.apache.paimon.utils.SerializationUtils;
6167

6268
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
6369

70+
import javax.annotation.Nullable;
71+
6472
import java.io.IOException;
6573
import java.time.Instant;
6674
import java.time.LocalDateTime;
6775
import java.time.ZoneId;
76+
import java.util.ArrayList;
6877
import java.util.Arrays;
6978
import java.util.Collections;
7079
import java.util.Comparator;
@@ -140,32 +149,95 @@ public Table copy(Map<String, String> dynamicOptions) {
140149

141150
private static class PartitionsScan extends ReadOnceTableScan {
142151

152+
@Nullable private LeafPredicate partitionPredicate;
153+
143154
@Override
144155
public InnerTableScan withFilter(Predicate predicate) {
156+
if (predicate == null) {
157+
return this;
158+
}
159+
160+
Map<String, LeafPredicate> leafPredicates =
161+
predicate.visit(LeafPredicateExtractor.INSTANCE);
162+
this.partitionPredicate = leafPredicates.get("partition");
163+
164+
// Handle Or(Equal, Equal...) pattern from PredicateBuilder.in() with <=20 literals
165+
if (this.partitionPredicate == null) {
166+
for (Predicate andChild : PredicateBuilder.splitAnd(predicate)) {
167+
LeafPredicate inPred = convertOrEqualsToIn(andChild, "partition");
168+
if (inPred != null) {
169+
this.partitionPredicate = inPred;
170+
break;
171+
}
172+
}
173+
}
145174
return this;
146175
}
147176

177+
@Nullable
178+
private static LeafPredicate convertOrEqualsToIn(Predicate predicate, String targetField) {
179+
List<Predicate> orChildren = PredicateBuilder.splitOr(predicate);
180+
if (orChildren.size() <= 1) {
181+
return null;
182+
}
183+
List<Object> literals = new ArrayList<>();
184+
String fieldName = null;
185+
int fieldIndex = -1;
186+
DataType fieldType = null;
187+
for (Predicate child : orChildren) {
188+
if (!(child instanceof LeafPredicate)) {
189+
return null;
190+
}
191+
LeafPredicate leaf = (LeafPredicate) child;
192+
if (!(leaf.function() instanceof Equal)) {
193+
return null;
194+
}
195+
if (fieldName == null) {
196+
fieldName = leaf.fieldName();
197+
fieldIndex = leaf.index();
198+
fieldType = leaf.type();
199+
} else if (!fieldName.equals(leaf.fieldName())) {
200+
return null;
201+
}
202+
literals.addAll(leaf.literals());
203+
}
204+
if (!targetField.equals(fieldName)) {
205+
return null;
206+
}
207+
return new LeafPredicate(In.INSTANCE, fieldType, fieldIndex, fieldName, literals);
208+
}
209+
148210
@Override
149211
public Plan innerPlan() {
150-
return () -> Collections.singletonList(new PartitionsSplit());
212+
return () -> Collections.singletonList(new PartitionsSplit(partitionPredicate));
151213
}
152214
}
153215

154216
private static class PartitionsSplit extends SingletonSplit {
155217

156218
private static final long serialVersionUID = 1L;
157219

220+
@Nullable private final LeafPredicate partitionPredicate;
221+
222+
private PartitionsSplit(@Nullable LeafPredicate partitionPredicate) {
223+
this.partitionPredicate = partitionPredicate;
224+
}
225+
158226
@Override
159227
public boolean equals(Object o) {
160228
if (this == o) {
161229
return true;
162230
}
163-
return o != null && getClass() == o.getClass();
231+
if (o == null || getClass() != o.getClass()) {
232+
return false;
233+
}
234+
PartitionsSplit that = (PartitionsSplit) o;
235+
return Objects.equals(partitionPredicate, that.partitionPredicate);
164236
}
165237

166238
@Override
167239
public int hashCode() {
168-
return 1;
240+
return Objects.hash(partitionPredicate);
169241
}
170242

171243
@Override
@@ -186,7 +258,7 @@ public PartitionsRead(FileStoreTable table) {
186258

187259
@Override
188260
public InnerTableRead withFilter(Predicate predicate) {
189-
// TODO
261+
// filter pushdown is handled at the Scan layer through PartitionsSplit
190262
return this;
191263
}
192264

@@ -207,7 +279,8 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
207279
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
208280
}
209281

210-
List<Partition> partitions = listPartitions();
282+
PartitionsSplit partitionsSplit = (PartitionsSplit) split;
283+
List<Partition> partitions = listPartitions(partitionsSplit.partitionPredicate);
211284

212285
List<DataType> fieldTypes =
213286
fileStoreTable.schema().logicalPartitionType().getFieldTypes();
@@ -320,17 +393,17 @@ private Timestamp toTimestamp(Long epochMillis) {
320393
Instant.ofEpochMilli(epochMillis), ZoneId.systemDefault()));
321394
}
322395

323-
private List<Partition> listPartitions() {
396+
private List<Partition> listPartitions(@Nullable LeafPredicate partitionPredicate) {
324397
CatalogLoader catalogLoader = fileStoreTable.catalogEnvironment().catalogLoader();
325398
if (TimeTravelUtil.hasTimeTravelOptions(new Options(fileStoreTable.options()))
326399
|| catalogLoader == null) {
327-
return listPartitionEntries();
400+
return listPartitionEntries(partitionPredicate);
328401
}
329402

330403
try (Catalog catalog = catalogLoader.load()) {
331404
Identifier baseIdentifier = fileStoreTable.catalogEnvironment().identifier();
332405
if (baseIdentifier == null) {
333-
return listPartitionEntries();
406+
return listPartitionEntries(partitionPredicate);
334407
}
335408
String branch = fileStoreTable.coreOptions().branch();
336409
Identifier identifier;
@@ -343,15 +416,53 @@ private List<Partition> listPartitions() {
343416
} else {
344417
identifier = baseIdentifier;
345418
}
346-
return catalog.listPartitions(identifier);
419+
List<Partition> partitions = catalog.listPartitions(identifier);
420+
return filterByPredicate(partitions, partitionPredicate);
347421
} catch (Exception e) {
348-
return listPartitionEntries();
422+
return listPartitionEntries(partitionPredicate);
349423
}
350424
}
351425

352-
private List<Partition> listPartitionEntries() {
353-
List<PartitionEntry> partitionEntries =
354-
fileStoreTable.newScan().withLevelFilter(level -> true).listPartitionEntries();
426+
private List<Partition> filterByPredicate(
427+
List<Partition> partitions, @Nullable LeafPredicate partitionPredicate) {
428+
if (partitionPredicate == null) {
429+
return partitions;
430+
}
431+
List<String> partitionKeys = fileStoreTable.partitionKeys();
432+
return partitions.stream()
433+
.filter(
434+
p -> {
435+
StringBuilder sb = new StringBuilder();
436+
for (int i = 0; i < partitionKeys.size(); i++) {
437+
if (i > 0) {
438+
sb.append("/");
439+
}
440+
sb.append(partitionKeys.get(i))
441+
.append("=")
442+
.append(p.spec().get(partitionKeys.get(i)));
443+
}
444+
return partitionPredicate.test(
445+
GenericRow.of(BinaryString.fromString(sb.toString())));
446+
})
447+
.collect(Collectors.toList());
448+
}
449+
450+
private List<Partition> listPartitionEntries(@Nullable LeafPredicate partitionPredicate) {
451+
InnerTableScan scan = fileStoreTable.newScan().withLevelFilter(level -> true);
452+
453+
if (partitionPredicate != null) {
454+
List<String> partitionKeys = fileStoreTable.partitionKeys();
455+
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
456+
Predicate partPred =
457+
PartitionPredicateHelper.buildPartitionPredicate(
458+
partitionPredicate, partitionKeys, partitionType);
459+
if (partPred == null) {
460+
return Collections.emptyList();
461+
}
462+
scan.withPartitionFilter(partPred);
463+
}
464+
465+
List<PartitionEntry> partitionEntries = scan.listPartitionEntries();
355466
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
356467
String defaultPartitionName = fileStoreTable.coreOptions().partitionDefaultName();
357468
String[] partitionColumns = fileStoreTable.partitionKeys().toArray(new String[0]);

paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java

Lines changed: 83 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -36,27 +36,36 @@
3636

3737
/**
3838
* Helper for applying partition predicate pushdown in system tables (BucketsTable, FilesTable,
39-
* FileKeyRangesTable).
39+
* FileKeyRangesTable, PartitionsTable).
4040
*/
4141
public class PartitionPredicateHelper {
4242

43-
public static boolean applyPartitionFilter(
44-
SnapshotReader snapshotReader,
45-
@Nullable LeafPredicate partitionPredicate,
46-
List<String> partitionKeys,
47-
RowType partitionType) {
48-
if (partitionPredicate == null) {
49-
return true;
50-
}
51-
43+
/**
44+
* Build a partition-typed predicate from a string-based leaf predicate on the "partition"
45+
* column.
46+
*
47+
* @return the predicate on partition fields, or {@code null} if the partition spec is invalid
48+
* (indicating no results should be returned)
49+
*/
50+
@Nullable
51+
public static Predicate buildPartitionPredicate(
52+
LeafPredicate partitionPredicate, List<String> partitionKeys, RowType partitionType) {
5253
if (partitionPredicate.function() instanceof Equal) {
5354
LinkedHashMap<String, String> partSpec =
5455
parsePartitionSpec(
5556
partitionPredicate.literals().get(0).toString(), partitionKeys);
5657
if (partSpec == null) {
57-
return false;
58+
return null;
5859
}
59-
snapshotReader.withPartitionFilter(partSpec);
60+
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
61+
List<Predicate> predicates = new ArrayList<>();
62+
for (int i = 0; i < partitionKeys.size(); i++) {
63+
Object value =
64+
TypeUtils.castFromString(
65+
partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i));
66+
predicates.add(partBuilder.equal(i, value));
67+
}
68+
return PredicateBuilder.and(predicates);
6069
} else if (partitionPredicate.function() instanceof In) {
6170
List<Predicate> orPredicates = new ArrayList<>();
6271
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
@@ -75,51 +84,88 @@ public static boolean applyPartitionFilter(
7584
}
7685
orPredicates.add(PredicateBuilder.and(andPredicates));
7786
}
78-
if (!orPredicates.isEmpty()) {
79-
snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates));
80-
}
87+
return orPredicates.isEmpty() ? null : PredicateBuilder.or(orPredicates);
8188
} else if (partitionPredicate.function() instanceof LeafBinaryFunction) {
8289
LinkedHashMap<String, String> partSpec =
8390
parsePartitionSpec(
8491
partitionPredicate.literals().get(0).toString(), partitionKeys);
85-
if (partSpec != null) {
86-
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
87-
List<Predicate> predicates = new ArrayList<>();
88-
for (int i = 0; i < partitionKeys.size(); i++) {
89-
Object value =
90-
TypeUtils.castFromString(
91-
partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i));
92-
predicates.add(
93-
new LeafPredicate(
94-
partitionPredicate.function(),
95-
partitionType.getTypeAt(i),
96-
i,
97-
partitionKeys.get(i),
98-
Collections.singletonList(value)));
99-
}
100-
snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates));
92+
if (partSpec == null) {
93+
return null;
94+
}
95+
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
96+
List<Predicate> predicates = new ArrayList<>();
97+
for (int i = 0; i < partitionKeys.size(); i++) {
98+
Object value =
99+
TypeUtils.castFromString(
100+
partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i));
101+
predicates.add(
102+
new LeafPredicate(
103+
partitionPredicate.function(),
104+
partitionType.getTypeAt(i),
105+
i,
106+
partitionKeys.get(i),
107+
Collections.singletonList(value)));
101108
}
109+
return PredicateBuilder.and(predicates);
110+
}
111+
return null;
112+
}
113+
114+
public static boolean applyPartitionFilter(
115+
SnapshotReader snapshotReader,
116+
@Nullable LeafPredicate partitionPredicate,
117+
List<String> partitionKeys,
118+
RowType partitionType) {
119+
if (partitionPredicate == null) {
120+
return true;
102121
}
103122

123+
Predicate predicate =
124+
buildPartitionPredicate(partitionPredicate, partitionKeys, partitionType);
125+
if (predicate == null) {
126+
return false;
127+
}
128+
snapshotReader.withPartitionFilter(predicate);
104129
return true;
105130
}
106131

107132
@Nullable
108133
public static LinkedHashMap<String, String> parsePartitionSpec(
109134
String partitionStr, List<String> partitionKeys) {
135+
// Handle {value1, value2} format (BucketsTable, FilesTable, FileKeyRangesTable)
110136
if (partitionStr.startsWith("{")) {
111137
partitionStr = partitionStr.substring(1);
138+
if (partitionStr.endsWith("}")) {
139+
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
140+
}
141+
String[] partFields = partitionStr.split(", ");
142+
if (partitionKeys.size() != partFields.length) {
143+
return null;
144+
}
145+
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
146+
for (int i = 0; i < partitionKeys.size(); i++) {
147+
partSpec.put(partitionKeys.get(i), partFields[i]);
148+
}
149+
return partSpec;
112150
}
113-
if (partitionStr.endsWith("}")) {
114-
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
115-
}
116-
String[] partFields = partitionStr.split(", ");
151+
152+
// Handle key=value/key=value format (PartitionsTable)
153+
String[] partFields = partitionStr.split("/");
117154
if (partitionKeys.size() != partFields.length) {
118155
return null;
119156
}
120157
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
121-
for (int i = 0; i < partitionKeys.size(); i++) {
122-
partSpec.put(partitionKeys.get(i), partFields[i]);
158+
for (String field : partFields) {
159+
int eqIndex = field.indexOf('=');
160+
if (eqIndex < 0) {
161+
return null;
162+
}
163+
partSpec.put(field.substring(0, eqIndex), field.substring(eqIndex + 1));
164+
}
165+
for (String key : partitionKeys) {
166+
if (!partSpec.containsKey(key)) {
167+
return null;
168+
}
123169
}
124170
return partSpec;
125171
}

0 commit comments

Comments
 (0)