diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java index 96d4369dcec9d..52d65122890d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -22,12 +22,15 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.FollowingBatchCompactionAlignedChunkWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.PageHeader; import org.apache.tsfile.file.metadata.ChunkMetadata; @@ -63,6 +66,13 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { // The index of the array corresponds to subTaskId. protected int[] chunkPointNumArray = new int[subTaskNum]; + // Each sub task has estimated total size of written points in current chunk. + // The index of the array corresponds to subTaskId. + protected long[] writtenPointTotalSizeArray = new long[subTaskNum]; + + // Whether each sub task's current chunk writer contains TEXT, STRING, BLOB or OBJECT. + protected boolean[] hasVariableLengthTypeArray = new boolean[subTaskNum]; + // used to control the target chunk size protected long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -74,7 +84,12 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { @SuppressWarnings("squid:S1170") private final long checkPoint = (targetChunkPointNum >= 10 ? targetChunkPointNum : 10) / 10; - private long lastCheckIndex = 0; + private final long[] lastCheckIndexArray = new long[subTaskNum]; + + // When estimated size of written points reaches check point, then check chunk size. + private final long writtenPointTotalSizeCheckPoint = Math.max(targetChunkSize / 10, 1L); + + private final long[] lastWrittenPointTotalSizeCheckIndexArray = new long[subTaskNum]; // if unsealed chunk size is lower then this, then deserialize next chunk no matter it is // overlapped or not @@ -115,10 +130,24 @@ public Deletion getTTLLowerBoundForCurrentDevice() { } public void startMeasurement(String measurement, IChunkWriter chunkWriter, int subTaskId) { - lastCheckIndex = 0; + resetChunkWriterStatistics(subTaskId); lastTimeSet[subTaskId] = false; chunkWriters[subTaskId] = chunkWriter; measurementId[subTaskId] = measurement; + hasVariableLengthTypeArray[subTaskId] = containsVariableLengthType(chunkWriter); + } + + private boolean containsVariableLengthType(IChunkWriter chunkWriter) { + if (chunkWriter instanceof ChunkWriterImpl) { + return ((ChunkWriterImpl) chunkWriter).getDataType().isBinary(); + } + AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; + for (ValueChunkWriter valueChunkWriter : alignedChunkWriter.getValueChunkWriterList()) { + if (valueChunkWriter.getDataType().isBinary()) { + return true; + } + } + return false; } public abstract void endMeasurement(int subTaskId) throws IOException; @@ -139,7 +168,9 @@ public void startMeasurement(String measurement, IChunkWriter chunkWriter, int s */ public abstract void checkAndMayFlushChunkMetadata() throws IOException; - protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) { + protected void writeDataPoint( + long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter, int subTaskId) { + long writtenPointTotalSize = 0; if (chunkWriter instanceof ChunkWriterImpl) { ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter; switch (chunkWriterImpl.getDataType()) { @@ -147,6 +178,7 @@ protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWrite case STRING: case BLOB: chunkWriterImpl.write(timestamp, value.getBinary()); + writtenPointTotalSize += value.getBinary().getLength(); break; case DOUBLE: chunkWriterImpl.write(timestamp, value.getDouble()); @@ -172,7 +204,85 @@ protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWrite } else { AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; alignedChunkWriter.write(timestamp, value.getVector()); + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSize = estimateWrittenPointTotalSize(value); + } + } + chunkPointNumArray[subTaskId]++; + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += writtenPointTotalSize; + } + } + + private long estimateWrittenPointTotalSize(TsPrimitiveType value) { + long size = Long.BYTES; + TsPrimitiveType[] vector = value.getVector(); + for (TsPrimitiveType tsPrimitiveType : vector) { + if (tsPrimitiveType == null) { + continue; + } + TSDataType dataType = tsPrimitiveType.getDataType(); + switch (dataType) { + case TEXT: + case STRING: + case BLOB: + size += tsPrimitiveType.getBinary().getLength(); + break; + case DOUBLE: + case INT64: + case TIMESTAMP: + size += Long.BYTES; + break; + case INT32: + case DATE: + case FLOAT: + size += Integer.BYTES; + break; + case BOOLEAN: + size += 1; + break; + default: + break; + } + } + return size; + } + + protected long estimateWrittenPointTotalSize(TsBlock tsBlock) { + int pointCount = tsBlock.getPositionCount(); + long size = (long) Long.BYTES * pointCount; + Column[] columns = tsBlock.getValueColumns(); + for (Column column : columns) { + TSDataType dataType = column.getDataType(); + if (dataType.isBinary()) { + for (int j = 0; j < pointCount; j++) { + if (column.isNull(j)) { + continue; + } + size += column.getBinary(j).getLength(); + } + continue; + } + // This is only used as a checkpoint estimate, so fixed-width values use count directly. + switch (dataType) { + case DOUBLE: + case INT64: + case TIMESTAMP: + size += (long) Long.BYTES * pointCount; + break; + case INT32: + case DATE: + case FLOAT: + size += (long) Integer.BYTES * pointCount; + break; + case BOOLEAN: + size += pointCount; + break; + default: + break; + } } + return size; } @SuppressWarnings("squid:S2445") @@ -182,7 +292,14 @@ protected void sealChunk( synchronized (targetWriter) { targetWriter.writeChunk(chunkWriter); } + resetChunkWriterStatistics(subTaskId); + } + + private void resetChunkWriterStatistics(int subTaskId) { chunkPointNumArray[subTaskId] = 0; + writtenPointTotalSizeArray[subTaskId] = 0; + lastCheckIndexArray[subTaskId] = 0; + lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = 0; } public abstract boolean flushNonAlignedChunk( @@ -204,7 +321,7 @@ protected void flushNonAlignedChunkToFileWriter( synchronized (targetWriter) { // seal last chunk to file writer targetWriter.writeChunk(chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId] = 0; + resetChunkWriterStatistics(subTaskId); targetWriter.writeChunk(chunk, chunkMetadata); } } @@ -222,7 +339,7 @@ protected void flushAlignedChunkToFileWriter( AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId]; // seal last chunk to file writer targetWriter.writeChunk(alignedChunkWriter); - chunkPointNumArray[subTaskId] = 0; + resetChunkWriterStatistics(subTaskId); targetWriter.markStartingWritingAligned(); @@ -269,6 +386,9 @@ protected void flushNonAlignedPageToChunkWriter( chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader); chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount(); + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += pageHeader.getSerializedPageSize(); + } } public abstract boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId) @@ -293,29 +413,51 @@ protected void flushAlignedPageToChunkWriter( // flush new time page to chunk writer directly alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData, timePageHeader); + long writtenValuePageSize = 0; // flush new value pages to chunk writer directly for (int i = 0; i < valuePageHeaders.size(); i++) { - if (valuePageHeaders.get(i) == null) { + PageHeader valuePageHeader = valuePageHeaders.get(i); + if (valuePageHeader == null) { // sub sensor does not exist in current file or value page has been deleted completely alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer(); continue; } alignedChunkWriter.writePageHeaderAndDataIntoValueBuff( - compressedValuePageDatas.get(i), valuePageHeaders.get(i), i); + compressedValuePageDatas.get(i), valuePageHeader, i); + if (hasVariableLengthTypeArray[subTaskId]) { + writtenValuePageSize += valuePageHeader.getSerializedPageSize(); + } } chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount(); + if (hasVariableLengthTypeArray[subTaskId]) { + // Direct-flushed pages are already serialized, so use page size as checkpoint estimate. + writtenPointTotalSizeArray[subTaskId] += + timePageHeader.getSerializedPageSize() + writtenValuePageSize; + } } protected void checkChunkSizeAndMayOpenANewChunk( CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int subTaskId) throws IOException { - if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) { - // if chunk point num reaches the check point, then check if the chunk size over threshold - lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint; + if (chunkWriter instanceof FollowingBatchCompactionAlignedChunkWriter + && chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { + sealChunk(fileWriter, chunkWriter, subTaskId); + return; + } + boolean reachesPointCheckPoint = + chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) * checkPoint; + boolean reachesSizeCheckPoint = + hasVariableLengthTypeArray[subTaskId] + && writtenPointTotalSizeArray[subTaskId] + >= (lastWrittenPointTotalSizeCheckIndexArray[subTaskId] + 1) + * writtenPointTotalSizeCheckPoint; + if (reachesPointCheckPoint || reachesSizeCheckPoint) { + lastCheckIndexArray[subTaskId] = chunkPointNumArray[subTaskId] / checkPoint; + lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = + writtenPointTotalSizeArray[subTaskId] / writtenPointTotalSizeCheckPoint; if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { sealChunk(fileWriter, chunkWriter, subTaskId); - lastCheckIndex = 0; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index d630d5e70f4af..777e77ab7d169 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -133,8 +133,7 @@ public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId); int fileIndex = seqFileIndexArray[subTaskId]; - writeDataPoint(timestamp, value, chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId]++; + writeDataPoint(timestamp, value, chunkWriters[subTaskId], subTaskId); checkChunkSizeAndMayOpenANewChunk( targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId); isDeviceExistedInTargetFiles[fileIndex] = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index c460f7ede285f..77b7ea4dec25c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -115,8 +115,8 @@ public void endMeasurement(int subTaskId) throws IOException { @Override public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException { checkPreviousTimestamp(timeValuePair.getTimestamp(), subTaskId); - writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId]++; + writeDataPoint( + timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId); checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); lastTime[subTaskId] = timeValuePair.getTimestamp(); lastTimeSet[subTaskId] = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java index 6519101781ccf..daf66ef3dcb14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java @@ -182,6 +182,8 @@ public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTa valuePageHeaders, subTaskId); + checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); + lastTime[subTaskId] = timePageHeader.getEndTime(); lastTimeSet[subTaskId] = true; return true; @@ -218,6 +220,8 @@ public boolean flushBatchedValuePage( valuePageHeaders, subTaskId); + checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); + lastTime[subTaskId] = timePageHeader.getEndTime(); lastTimeSet[subTaskId] = true; return true; @@ -228,10 +232,12 @@ public boolean flushBatchedValuePage( * successfully or not. Return false if the unsealed page is too small or the end time of page * exceeds the end time of file, else return true. * + * @throws IOException if io errors occurred * @throws PageException if errors occurred when write data page header */ public boolean flushNonAlignedPage( - ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) throws PageException { + ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) + throws IOException, PageException { checkPreviousTimestamp(pageHeader.getStartTime(), subTaskId); boolean isUnsealedPageOverThreshold = chunkWriters[subTaskId].checkIsUnsealedPageOverThreshold( @@ -244,6 +250,8 @@ public boolean flushNonAlignedPage( flushNonAlignedPageToChunkWriter( (ChunkWriterImpl) chunkWriters[subTaskId], compressedPageData, pageHeader, subTaskId); + checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); + lastTime[subTaskId] = pageHeader.getEndTime(); lastTimeSet[subTaskId] = true; return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java index 231bd753d7731..e5812b1c315f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java @@ -56,13 +56,16 @@ public void write(TsBlock tsBlock, int subTaskId) throws IOException { checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId); AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; chunkWriter.write(timestamps, columns, batchSize); + chunkPointNumArray[subTaskId] += batchSize; + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock); + } synchronized (this) { // we need to synchronized here to avoid multi-thread competition in sub-task TsFileResource resource = targetResources.get(seqFileIndexArray[subTaskId]); resource.updateStartTime(deviceId, timestamps.getStartTime()); resource.updateEndTime(deviceId, timestamps.getEndTime()); } - chunkPointNumArray[subTaskId] += timestamps.getTimes().length; checkChunkSizeAndMayOpenANewChunk( targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, subTaskId); isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java index 2e85124f8f587..a67a0b562b7a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java @@ -54,7 +54,10 @@ public void write(TsBlock tsBlock, int subTaskId) throws IOException { int batchSize = tsBlock.getPositionCount(); AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; chunkWriter.write(timestamps, columns, batchSize); - chunkPointNumArray[subTaskId] += timestamps.getTimes().length; + chunkPointNumArray[subTaskId] += batchSize; + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock); + } checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java index 5d46eb50ab9be..d8465ee1a305f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java @@ -74,8 +74,8 @@ public void endMeasurement(int subTaskId) throws IOException { } private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId) throws IOException { - writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId]++; + writeDataPoint( + timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId); checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java new file mode 100644 index 0000000000000..ee8a54daef3eb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer; + +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.PageException; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class AbstractCompactionWriterTest { + + private static final int SUB_TASK_ID = 0; + + @Test + public void testBinarySizeCheckpointTriggersChunkSizeCheckBeforePointCheckpoint() + throws IOException, PageException { + TestCompactionWriter compactionWriter = new TestCompactionWriter(); + CountingChunkWriter chunkWriter = new CountingChunkWriter(); + PageHeader pageHeader = + createPageHeader(compactionWriter.getCompressedSizeToReachSizeCheckpoint()); + + compactionWriter.startMeasurement("s1", chunkWriter, SUB_TASK_ID); + compactionWriter.flushNonAlignedPageToChunkWriter( + chunkWriter, ByteBuffer.allocate(0), pageHeader, SUB_TASK_ID); + compactionWriter.checkChunkSizeAndMayOpenANewChunk(null, chunkWriter, SUB_TASK_ID); + + Assert.assertEquals(1, chunkWriter.chunkSizeCheckCount); + } + + private static PageHeader createPageHeader(int compressedSize) { + Statistics statistics = Statistics.getStatsByType(TSDataType.TEXT); + return new PageHeader(compressedSize, compressedSize, statistics); + } + + private static class CountingChunkWriter extends ChunkWriterImpl { + + private int chunkSizeCheckCount; + + private CountingChunkWriter() { + super(new MeasurementSchema("s1", TSDataType.TEXT)); + } + + @Override + public boolean checkIsChunkSizeOverThreshold( + long size, long pointNum, boolean returnTrueIfChunkEmpty) { + chunkSizeCheckCount++; + return false; + } + } + + private static class TestCompactionWriter extends AbstractCompactionWriter { + + private int getCompressedSizeToReachSizeCheckpoint() { + return (int) Math.max(targetChunkSize / 10, 1L); + } + + @Override + public void startChunkGroup(IDeviceID deviceId, boolean isAlign) {} + + @Override + public void endChunkGroup() {} + + @Override + public void endMeasurement(int subTaskId) {} + + @Override + public void write(TimeValuePair timeValuePair, int subTaskId) {} + + @Override + public void write(TsBlock tsBlock, int subTaskId) {} + + @Override + public void endFile() {} + + @Override + public long getWriterSize() { + return 0; + } + + @Override + public void checkAndMayFlushChunkMetadata() {} + + @Override + public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) { + return false; + } + + @Override + public boolean flushAlignedChunk(ChunkMetadataElement chunkMetadataElement, int subTaskId) { + return false; + } + + @Override + public boolean flushBatchedValueChunk( + ChunkMetadataElement chunkMetadataElement, + int subTaskId, + AbstractCompactionFlushController flushController) { + return false; + } + + @Override + public boolean flushNonAlignedPage( + ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) { + return false; + } + + @Override + public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId) { + return false; + } + + @Override + public boolean flushBatchedValuePage( + AlignedPageElement alignedPageElement, + int subTaskId, + AbstractCompactionFlushController flushController) { + return false; + } + + @Override + public void close() {} + } +}