Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.FollowingBatchCompactionAlignedChunkWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;

import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
Expand Down Expand Up @@ -63,6 +66,13 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
// The index of the array corresponds to subTaskId.
protected int[] chunkPointNumArray = new int[subTaskNum];

// Each sub task has estimated total size of written points in current chunk.
// The index of the array corresponds to subTaskId.
protected long[] writtenPointTotalSizeArray = new long[subTaskNum];

// Whether each sub task's current chunk writer contains TEXT, STRING, BLOB or OBJECT.
protected boolean[] hasVariableLengthTypeArray = new boolean[subTaskNum];

// used to control the target chunk size
protected long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();

Expand All @@ -74,7 +84,12 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
@SuppressWarnings("squid:S1170")
private final long checkPoint = (targetChunkPointNum >= 10 ? targetChunkPointNum : 10) / 10;

private long lastCheckIndex = 0;
private final long[] lastCheckIndexArray = new long[subTaskNum];

// When estimated size of written points reaches check point, then check chunk size.
private final long writtenPointTotalSizeCheckPoint = Math.max(targetChunkSize / 10, 1L);

private final long[] lastWrittenPointTotalSizeCheckIndexArray = new long[subTaskNum];

// if unsealed chunk size is lower then this, then deserialize next chunk no matter it is
// overlapped or not
Expand Down Expand Up @@ -115,10 +130,24 @@ public Deletion getTTLLowerBoundForCurrentDevice() {
}

public void startMeasurement(String measurement, IChunkWriter chunkWriter, int subTaskId) {
lastCheckIndex = 0;
resetChunkWriterStatistics(subTaskId);
lastTimeSet[subTaskId] = false;
chunkWriters[subTaskId] = chunkWriter;
measurementId[subTaskId] = measurement;
hasVariableLengthTypeArray[subTaskId] = containsVariableLengthType(chunkWriter);
}

private boolean containsVariableLengthType(IChunkWriter chunkWriter) {
if (chunkWriter instanceof ChunkWriterImpl) {
return ((ChunkWriterImpl) chunkWriter).getDataType().isBinary();
}
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
for (ValueChunkWriter valueChunkWriter : alignedChunkWriter.getValueChunkWriterList()) {
if (valueChunkWriter.getDataType().isBinary()) {
return true;
}
}
return false;
}

public abstract void endMeasurement(int subTaskId) throws IOException;
Expand All @@ -139,14 +168,17 @@ public void startMeasurement(String measurement, IChunkWriter chunkWriter, int s
*/
public abstract void checkAndMayFlushChunkMetadata() throws IOException;

protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) {
protected void writeDataPoint(
long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter, int subTaskId) {
long writtenPointTotalSize = 0;
if (chunkWriter instanceof ChunkWriterImpl) {
ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
switch (chunkWriterImpl.getDataType()) {
case TEXT:
case STRING:
case BLOB:
chunkWriterImpl.write(timestamp, value.getBinary());
writtenPointTotalSize += value.getBinary().getLength();
break;
case DOUBLE:
chunkWriterImpl.write(timestamp, value.getDouble());
Expand All @@ -172,7 +204,85 @@ protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWrite
} else {
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
alignedChunkWriter.write(timestamp, value.getVector());
if (hasVariableLengthTypeArray[subTaskId]) {
writtenPointTotalSize = estimateWrittenPointTotalSize(value);
}
}
chunkPointNumArray[subTaskId]++;
if (hasVariableLengthTypeArray[subTaskId]) {
writtenPointTotalSizeArray[subTaskId] += writtenPointTotalSize;
}
}

private long estimateWrittenPointTotalSize(TsPrimitiveType value) {
long size = Long.BYTES;
TsPrimitiveType[] vector = value.getVector();
for (TsPrimitiveType tsPrimitiveType : vector) {
if (tsPrimitiveType == null) {
continue;
}
TSDataType dataType = tsPrimitiveType.getDataType();
switch (dataType) {
case TEXT:
case STRING:
case BLOB:
size += tsPrimitiveType.getBinary().getLength();
break;
case DOUBLE:
case INT64:
case TIMESTAMP:
size += Long.BYTES;
break;
case INT32:
case DATE:
case FLOAT:
size += Integer.BYTES;
break;
case BOOLEAN:
size += 1;
break;
default:
break;
}
}
return size;
}

protected long estimateWrittenPointTotalSize(TsBlock tsBlock) {
int pointCount = tsBlock.getPositionCount();
long size = (long) Long.BYTES * pointCount;
Column[] columns = tsBlock.getValueColumns();
for (Column column : columns) {
TSDataType dataType = column.getDataType();
if (dataType.isBinary()) {
for (int j = 0; j < pointCount; j++) {
if (column.isNull(j)) {
continue;
}
size += column.getBinary(j).getLength();
}
continue;
}
// This is only used as a checkpoint estimate, so fixed-width values use count directly.
switch (dataType) {
case DOUBLE:
case INT64:
case TIMESTAMP:
size += (long) Long.BYTES * pointCount;
break;
case INT32:
case DATE:
case FLOAT:
size += (long) Integer.BYTES * pointCount;
break;
case BOOLEAN:
size += pointCount;
break;
default:
break;
}
}
return size;
}

@SuppressWarnings("squid:S2445")
Expand All @@ -182,7 +292,14 @@ protected void sealChunk(
synchronized (targetWriter) {
targetWriter.writeChunk(chunkWriter);
}
resetChunkWriterStatistics(subTaskId);
}

private void resetChunkWriterStatistics(int subTaskId) {
chunkPointNumArray[subTaskId] = 0;
writtenPointTotalSizeArray[subTaskId] = 0;
lastCheckIndexArray[subTaskId] = 0;
lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = 0;
}

public abstract boolean flushNonAlignedChunk(
Expand All @@ -204,7 +321,7 @@ protected void flushNonAlignedChunkToFileWriter(
synchronized (targetWriter) {
// seal last chunk to file writer
targetWriter.writeChunk(chunkWriters[subTaskId]);
chunkPointNumArray[subTaskId] = 0;
resetChunkWriterStatistics(subTaskId);
targetWriter.writeChunk(chunk, chunkMetadata);
}
}
Expand All @@ -222,7 +339,7 @@ protected void flushAlignedChunkToFileWriter(
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId];
// seal last chunk to file writer
targetWriter.writeChunk(alignedChunkWriter);
chunkPointNumArray[subTaskId] = 0;
resetChunkWriterStatistics(subTaskId);

targetWriter.markStartingWritingAligned();

Expand Down Expand Up @@ -269,6 +386,9 @@ protected void flushNonAlignedPageToChunkWriter(
chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader);

chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount();
if (hasVariableLengthTypeArray[subTaskId]) {
writtenPointTotalSizeArray[subTaskId] += pageHeader.getSerializedPageSize();
}
}

public abstract boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId)
Expand All @@ -293,29 +413,51 @@ protected void flushAlignedPageToChunkWriter(
// flush new time page to chunk writer directly
alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData, timePageHeader);

long writtenValuePageSize = 0;
// flush new value pages to chunk writer directly
for (int i = 0; i < valuePageHeaders.size(); i++) {
if (valuePageHeaders.get(i) == null) {
PageHeader valuePageHeader = valuePageHeaders.get(i);
if (valuePageHeader == null) {
// sub sensor does not exist in current file or value page has been deleted completely
alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer();
continue;
}
alignedChunkWriter.writePageHeaderAndDataIntoValueBuff(
compressedValuePageDatas.get(i), valuePageHeaders.get(i), i);
compressedValuePageDatas.get(i), valuePageHeader, i);
if (hasVariableLengthTypeArray[subTaskId]) {
writtenValuePageSize += valuePageHeader.getSerializedPageSize();
}
}

chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount();
if (hasVariableLengthTypeArray[subTaskId]) {
// Direct-flushed pages are already serialized, so use page size as checkpoint estimate.
writtenPointTotalSizeArray[subTaskId] +=
timePageHeader.getSerializedPageSize() + writtenValuePageSize;
}
}

protected void checkChunkSizeAndMayOpenANewChunk(
CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int subTaskId)
throws IOException {
if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
// if chunk point num reaches the check point, then check if the chunk size over threshold
lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint;
if (chunkWriter instanceof FollowingBatchCompactionAlignedChunkWriter
&& chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
sealChunk(fileWriter, chunkWriter, subTaskId);
return;
}
boolean reachesPointCheckPoint =
chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) * checkPoint;
boolean reachesSizeCheckPoint =
hasVariableLengthTypeArray[subTaskId]
&& writtenPointTotalSizeArray[subTaskId]
>= (lastWrittenPointTotalSizeCheckIndexArray[subTaskId] + 1)
* writtenPointTotalSizeCheckPoint;
if (reachesPointCheckPoint || reachesSizeCheckPoint) {
lastCheckIndexArray[subTaskId] = chunkPointNumArray[subTaskId] / checkPoint;
lastWrittenPointTotalSizeCheckIndexArray[subTaskId] =
writtenPointTotalSizeArray[subTaskId] / writtenPointTotalSizeCheckPoint;
if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
sealChunk(fileWriter, chunkWriter, subTaskId);
lastCheckIndex = 0;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException

checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
int fileIndex = seqFileIndexArray[subTaskId];
writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
chunkPointNumArray[subTaskId]++;
writeDataPoint(timestamp, value, chunkWriters[subTaskId], subTaskId);
checkChunkSizeAndMayOpenANewChunk(
targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
isDeviceExistedInTargetFiles[fileIndex] = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public void endMeasurement(int subTaskId) throws IOException {
@Override
public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException {
checkPreviousTimestamp(timeValuePair.getTimestamp(), subTaskId);
writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]);
chunkPointNumArray[subTaskId]++;
writeDataPoint(
timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId);
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
lastTime[subTaskId] = timeValuePair.getTimestamp();
lastTimeSet[subTaskId] = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTa
valuePageHeaders,
subTaskId);

checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);

lastTime[subTaskId] = timePageHeader.getEndTime();
lastTimeSet[subTaskId] = true;
return true;
Expand Down Expand Up @@ -218,6 +220,8 @@ public boolean flushBatchedValuePage(
valuePageHeaders,
subTaskId);

checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);

lastTime[subTaskId] = timePageHeader.getEndTime();
lastTimeSet[subTaskId] = true;
return true;
Expand All @@ -228,10 +232,12 @@ public boolean flushBatchedValuePage(
* successfully or not. Return false if the unsealed page is too small or the end time of page
* exceeds the end time of file, else return true.
*
* @throws IOException if io errors occurred
* @throws PageException if errors occurred when write data page header
*/
public boolean flushNonAlignedPage(
ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) throws PageException {
ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId)
throws IOException, PageException {
checkPreviousTimestamp(pageHeader.getStartTime(), subTaskId);
boolean isUnsealedPageOverThreshold =
chunkWriters[subTaskId].checkIsUnsealedPageOverThreshold(
Expand All @@ -244,6 +250,8 @@ public boolean flushNonAlignedPage(
flushNonAlignedPageToChunkWriter(
(ChunkWriterImpl) chunkWriters[subTaskId], compressedPageData, pageHeader, subTaskId);

checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);

lastTime[subTaskId] = pageHeader.getEndTime();
lastTimeSet[subTaskId] = true;
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ public void write(TsBlock tsBlock, int subTaskId) throws IOException {
checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId);
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
chunkPointNumArray[subTaskId] += batchSize;
if (hasVariableLengthTypeArray[subTaskId]) {
writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock);
}
synchronized (this) {
// we need to synchronized here to avoid multi-thread competition in sub-task
TsFileResource resource = targetResources.get(seqFileIndexArray[subTaskId]);
resource.updateStartTime(deviceId, timestamps.getStartTime());
resource.updateEndTime(deviceId, timestamps.getEndTime());
}
chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
checkChunkSizeAndMayOpenANewChunk(
targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, subTaskId);
isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public void write(TsBlock tsBlock, int subTaskId) throws IOException {
int batchSize = tsBlock.getPositionCount();
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
chunkPointNumArray[subTaskId] += batchSize;
if (hasVariableLengthTypeArray[subTaskId]) {
writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock);
}
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public void endMeasurement(int subTaskId) throws IOException {
}

private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId) throws IOException {
writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]);
chunkPointNumArray[subTaskId]++;
writeDataPoint(
timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId);
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
}

Expand Down
Loading
Loading