Skip to content

Commit 7c93bd7

Browse files
authored
[core] Avoid key bytes OOM in ClusteringFileRewriter.sortAndRewriteFile (#7642)
Avoid key bytes OOM in ClusteringFileRewriter.sortAndRewriteFile. Removing the in-memory List<byte[]> collectedKeys and the batchPutIndex method eliminates the unbounded memory accumulation.
1 parent 6e3a7d3 commit 7c93bd7

File tree

3 files changed

+29
-73
lines changed

3 files changed

+29
-73
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,8 @@
4343

4444
import java.io.File;
4545
import java.io.IOException;
46-
import java.util.Collections;
4746
import java.util.List;
4847
import java.util.Optional;
49-
import java.util.Set;
5048
import java.util.concurrent.ExecutionException;
5149
import java.util.concurrent.ExecutorService;
5250
import java.util.stream.IntStream;
@@ -71,7 +69,7 @@ public class ClusteringCompactManager extends CompactFutureManager {
7169
private final RowType keyType;
7270
private final RowType valueType;
7371
private final ExecutorService executor;
74-
private final BucketedDvMaintainer dvMaintainer;
72+
@Nullable private final BucketedDvMaintainer dvMaintainer;
7573
private final boolean lazyGenDeletionFile;
7674
@Nullable private final CompactionMetrics.Reporter metricsReporter;
7775

@@ -89,7 +87,7 @@ public ClusteringCompactManager(
8987
KeyValueFileReaderFactory valueReaderFactory,
9088
KeyValueFileWriterFactory writerFactory,
9189
ExecutorService executor,
92-
BucketedDvMaintainer dvMaintainer,
90+
@Nullable BucketedDvMaintainer dvMaintainer,
9391
boolean lazyGenDeletionFile,
9492
List<DataFileMeta> restoreFiles,
9593
long targetFileSize,
@@ -206,14 +204,8 @@ private CompactResult compact(boolean fullCompaction) throws Exception {
206204
// Snapshot sorted files before Phase 1 to avoid including newly created files in Phase 2
207205
List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
208206
for (DataFileMeta file : unsortedFiles) {
209-
Set<String> originalFileNames = Collections.singleton(file.fileName());
210207
List<DataFileMeta> sortedFiles =
211-
fileRewriter.sortAndRewriteFiles(
212-
singletonList(file),
213-
kvSerializer,
214-
kvSchemaType,
215-
keyIndex,
216-
originalFileNames);
208+
fileRewriter.sortAndRewriteFile(file, kvSerializer, kvSchemaType, keyIndex);
217209
result.before().add(file);
218210
result.after().addAll(sortedFiles);
219211
}

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java

Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,13 @@
4848
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
4949
import org.apache.paimon.utils.MutableObjectIterator;
5050

51-
import javax.annotation.Nullable;
52-
5351
import java.io.IOException;
5452
import java.util.ArrayList;
5553
import java.util.Arrays;
5654
import java.util.Collections;
5755
import java.util.Comparator;
5856
import java.util.List;
5957
import java.util.PriorityQueue;
60-
import java.util.Set;
6158

6259
/**
6360
* Handles file rewriting for clustering compaction, including sorting unsorted files (Phase 1) and
@@ -115,20 +112,16 @@ public ClusteringFileRewriter(
115112
}
116113

117114
/**
118-
* Sort and rewrite unsorted files by clustering columns. Reads all KeyValue records, sorts them
115+
* Sort and rewrite unsorted file by clustering columns. Reads all KeyValue records, sorts them
119116
* using an external sort buffer, and writes to new level-1 files. Checks the key index inline
120117
* during writing to handle deduplication (FIRST_ROW skips duplicates, DEDUPLICATE marks old
121-
* positions in DV) and updates the index without re-reading the output files.
122-
*
123-
* @param keyIndex the key index for inline checking and batch update, or null to skip
124-
* @param originalFileNames file names of the original files being replaced (for index check)
118+
* positions in DV) and updates the index.
125119
*/
126-
public List<DataFileMeta> sortAndRewriteFiles(
127-
List<DataFileMeta> inputFiles,
120+
public List<DataFileMeta> sortAndRewriteFile(
121+
DataFileMeta inputFile,
128122
KeyValueSerializer kvSerializer,
129123
RowType kvSchemaType,
130-
@Nullable ClusteringKeyIndex keyIndex,
131-
Set<String> originalFileNames)
124+
ClusteringKeyIndex keyIndex)
132125
throws Exception {
133126
int[] sortFieldsInKeyValue =
134127
Arrays.stream(clusteringColumns)
@@ -146,21 +139,17 @@ public List<DataFileMeta> sortAndRewriteFiles(
146139
MemorySize.MAX_VALUE,
147140
false);
148141

149-
for (DataFileMeta file : inputFiles) {
150-
try (RecordReader<KeyValue> reader = valueReaderFactory.createRecordReader(file)) {
151-
try (CloseableIterator<KeyValue> iterator = reader.toCloseableIterator()) {
152-
while (iterator.hasNext()) {
153-
KeyValue kv = iterator.next();
154-
InternalRow serializedRow = kvSerializer.toRow(kv);
155-
sortBuffer.write(serializedRow);
156-
}
142+
try (RecordReader<KeyValue> reader = valueReaderFactory.createRecordReader(inputFile)) {
143+
try (CloseableIterator<KeyValue> iterator = reader.toCloseableIterator()) {
144+
while (iterator.hasNext()) {
145+
KeyValue kv = iterator.next();
146+
InternalRow serializedRow = kvSerializer.toRow(kv);
147+
sortBuffer.write(serializedRow);
157148
}
158149
}
159150
}
160151

161-
RowCompactedSerializer keySerializer =
162-
keyIndex != null ? new RowCompactedSerializer(keyType) : null;
163-
List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() : null;
152+
RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType);
164153

165154
RollingFileWriter<KeyValue, DataFileMeta> writer =
166155
writerFactory.createRollingClusteringFileWriter();
@@ -173,12 +162,9 @@ public List<DataFileMeta> sortAndRewriteFiles(
173162
kv.copy(
174163
new InternalRowSerializer(keyType),
175164
new InternalRowSerializer(valueType));
176-
if (keyIndex != null) {
177-
byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
178-
if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
179-
continue;
180-
}
181-
collectedKeys.add(keyBytes);
165+
byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
166+
if (!keyIndex.checkKey(keyBytes)) {
167+
continue;
182168
}
183169
writer.write(copied);
184170
}
@@ -188,23 +174,13 @@ public List<DataFileMeta> sortAndRewriteFiles(
188174
}
189175

190176
List<DataFileMeta> newFiles = writer.result();
191-
for (DataFileMeta file : inputFiles) {
192-
fileLevels.removeFile(file);
193-
}
177+
fileLevels.removeFile(inputFile);
194178
for (DataFileMeta newFile : newFiles) {
195179
fileLevels.addNewFile(newFile);
196180
}
197-
198-
// Batch update index using collected keys, split by file rowCount
199-
if (keyIndex != null) {
200-
int offset = 0;
201-
for (DataFileMeta newFile : newFiles) {
202-
int count = (int) newFile.rowCount();
203-
keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, offset + count));
204-
offset += count;
205-
}
181+
for (DataFileMeta sortedFile : newFiles) {
182+
keyIndex.rebuildIndex(sortedFile);
206183
}
207-
208184
return newFiles;
209185
}
210186

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.paimon.utils.CloseableIterator;
4040
import org.apache.paimon.utils.MutableObjectIterator;
4141

42+
import javax.annotation.Nullable;
43+
4244
import java.io.ByteArrayInputStream;
4345
import java.io.ByteArrayOutputStream;
4446
import java.io.Closeable;
@@ -48,9 +50,9 @@
4850
import java.util.Iterator;
4951
import java.util.List;
5052
import java.util.Map;
51-
import java.util.Set;
5253
import java.util.stream.IntStream;
5354

55+
import static org.apache.paimon.utils.Preconditions.checkNotNull;
5456
import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
5557
import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
5658

@@ -63,7 +65,7 @@ public class ClusteringKeyIndex implements Closeable {
6365
private final RowType keyType;
6466
private final IOManager ioManager;
6567
private final KeyValueFileReaderFactory keyReaderFactory;
66-
private final BucketedDvMaintainer dvMaintainer;
68+
private final @Nullable BucketedDvMaintainer dvMaintainer;
6769
private final SimpleLsmKvDb kvDb;
6870
private final ClusteringFiles fileLevels;
6971
private final boolean firstRow;
@@ -76,7 +78,7 @@ public ClusteringKeyIndex(
7678
RowType keyType,
7779
IOManager ioManager,
7880
KeyValueFileReaderFactory keyReaderFactory,
79-
BucketedDvMaintainer dvMaintainer,
81+
@Nullable BucketedDvMaintainer dvMaintainer,
8082
SimpleLsmKvDb kvDb,
8183
ClusteringFiles fileLevels,
8284
boolean firstRow,
@@ -216,41 +218,27 @@ public Map.Entry<byte[], byte[]> next() {
216218
* in deletion vectors, return true (write the new record).
217219
*
218220
* @param keyBytes serialized key bytes
219-
* @param originalFileNames file names of the original unsorted files being replaced
220221
* @return true if the record should be written, false to skip (FIRST_ROW dedup)
221222
*/
222-
public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames) throws Exception {
223+
public boolean checkKey(byte[] keyBytes) throws Exception {
223224
byte[] oldValue = kvDb.get(keyBytes);
224225
if (oldValue != null) {
225226
ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
226227
int oldFileId = decodeInt(valueIn);
227228
int oldPosition = decodeInt(valueIn);
228229
DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
229-
if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) {
230+
if (oldFile != null) {
230231
if (firstRow) {
231232
return false;
232233
} else {
234+
checkNotNull(dvMaintainer, "DvMaintainer cannot be null for DEDUPLICATE mode.");
233235
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
234236
}
235237
}
236238
}
237239
return true;
238240
}
239241

240-
/**
241-
* Batch update the key index for a new sorted file using pre-collected key bytes. Avoids
242-
* re-reading the file.
243-
*/
244-
public void batchPutIndex(DataFileMeta sortedFile, List<byte[]> keyBytesList) throws Exception {
245-
int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
246-
for (int position = 0; position < keyBytesList.size(); position++) {
247-
ByteArrayOutputStream value = new ByteArrayOutputStream(8);
248-
encodeInt(value, fileId);
249-
encodeInt(value, position);
250-
kvDb.put(keyBytesList.get(position), value.toByteArray());
251-
}
252-
}
253-
254242
/** Delete key index entries for the given file (only if they still point to it). */
255243
public void deleteIndex(DataFileMeta file) throws Exception {
256244
RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType);

0 commit comments

Comments
 (0)