Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageIndex.TableDiskUsageIndex;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -686,14 +686,11 @@ private TSStatus loadFileToDataRegion(String filePath, ProgressIndex progressInd
StorageEngine.getInstance().getDataRegion(((DataRegionId) consensusGroupId));
if (region != null) {
TsFileResource resource = generateTsFileResource(filePath, progressIndex);
region.loadNewTsFile(
resource,
true,
false,
true,
region.isTableModel()
? TableDiskUsageStatisticUtil.calculateTableSizeMap(resource)
: Optional.empty());
region.loadNewTsFile(resource, true, false, true, Optional.empty());
if (region.isTableModel()) {
TableDiskUsageIndex.getInstance()
.calculateAndWriteTableSizeMap(region.getDatabaseName(), resource);
}
} else {
// Data region is null indicates that dr has been removed or migrated. In those cases, there
// is no need to replicate data. we just return success to avoid leader keeping retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,25 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.consensus.pipe.IoTConsensusV2;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
Expand Down Expand Up @@ -178,6 +184,10 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
// If the insertNode's memory has reached the dangerous threshold, we should not extract any
// tablets.
private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) {
if (shouldForceUseTsFileForIoTConsensusV2WideTable(event)) {
return true;
}

final long floatingMemoryUsageInByte =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
Expand Down Expand Up @@ -213,6 +223,76 @@ private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) {
return mayInsertNodeMemoryReachDangerousThreshold;
}

private boolean shouldForceUseTsFileForIoTConsensusV2WideTable(final PipeRealtimeEvent event) {
if (pipeName == null
|| !pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
|| !(DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2)
|| !(event.getEvent() instanceof PipeInsertNodeTabletInsertionEvent)) {
return false;
}

final PipeInsertNodeTabletInsertionEvent tabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) event.getEvent();
if (!tabletInsertionEvent.isTableModelEvent()) {
return false;
}

final int measurementCount = countMeasurements(tabletInsertionEvent.getInsertNode());
final long estimatedEventSizeInBytes = tabletInsertionEvent.ramBytesUsed();
final PipeConfig pipeConfig = PipeConfig.getInstance();
final int measurementCountThreshold =
pipeConfig.getPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold();
final long memoryThresholdInBytes =
pipeConfig.getPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes();

final boolean isWideTableEvent =
measurementCountThreshold > 0 && measurementCount >= measurementCountThreshold;
final boolean isLargeEvent =
memoryThresholdInBytes > 0 && estimatedEventSizeInBytes >= memoryThresholdInBytes;
if (!isWideTableEvent && !isLargeEvent) {
return false;
}

if (event.maySourceOnlyUseTablets(this)) {
LOGGER.info(
"Pipe task {} in data region {} switches IoTConsensusV2 table-model realtime extraction "
+ "to TsFile for {}, because measurement count is {} (threshold {}) and estimated "
+ "event size is {} bytes (threshold {}).",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
measurementCount,
measurementCountThreshold,
estimatedEventSizeInBytes,
memoryThresholdInBytes);
}
return true;
}

private int countMeasurements(final InsertNode insertNode) {
if (insertNode instanceof InsertRowsNode) {
int maxMeasurementCount = 0;
for (final InsertNode insertRowNode : ((InsertRowsNode) insertNode).getInsertRowNodeList()) {
maxMeasurementCount = Math.max(maxMeasurementCount, countMeasurements(insertRowNode));
}
return maxMeasurementCount;
}
return countMeasurements(insertNode.getMeasurements());
}

private int countMeasurements(final String[] measurements) {
int count = 0;
if (measurements == null) {
return count;
}
for (final String measurement : measurements) {
if (measurement != null) {
++count;
}
}
return count;
}

@Override
public Event supply() {
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil;
import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageIndex.object.EmptyObjectTableSizeIndexReader;
import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageIndex.object.IObjectTableSizeIndexReader;
import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageIndex.tsfile.TsFileTableDiskUsageIndexWriter;
Expand All @@ -38,6 +40,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -153,6 +156,26 @@ public void write(String database, TsFileID tsFileID, Map<String, Long> tableSiz
addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap));
}

public void calculateAndWriteTableSizeMap(String database, TsFileResource resource) {
if (database == null || resource == null) {
return;
}
if (failedToRecover) {
return;
}
if (stop) {
LOGGER.warn(
"Skip adding operation {} to queue because TableDiskUsageIndex has been stopped.",
CalculateAndWriteTableSizeMapOperation.class.getSimpleName());
return;
}
if (!queue.offer(new CalculateAndWriteTableSizeMapOperation(database, resource))) {
LOGGER.warn(
"Skip calculating table disk usage for TsFile {} because TableDiskUsageIndex queue is full.",
resource.getTsFilePath());
}
}

public void write(String database, TsFileID originTsFileID, TsFileID newTsFileID) {
addOperationToQueue(new ReplaceTsFileOperation(database, originTsFileID, newTsFileID));
}
Expand Down Expand Up @@ -392,6 +415,26 @@ public void apply(TableDiskUsageIndex tableDiskUsageIndex) throws IOException {
}
}

private static class CalculateAndWriteTableSizeMapOperation extends Operation {

private final TsFileResource resource;

protected CalculateAndWriteTableSizeMapOperation(String database, TsFileResource resource) {
super(database, resource.getTsFileID().regionId);
this.resource = resource;
}

@Override
public void apply(TableDiskUsageIndex tableDiskUsageIndex) throws IOException {
final Optional<Map<String, Long>> tableSizeMap =
TableDiskUsageStatisticUtil.calculateTableSizeMap(resource);
if (tableSizeMap.isPresent() && !tableSizeMap.get().isEmpty()) {
new WriteOperation(database, resource.getTsFileID(), tableSizeMap.get())
.apply(tableDiskUsageIndex);
}
}
}

private static class ReplaceTsFileOperation extends Operation {
private final TsFileID originTsFileID;
private final TsFileID newTsFileID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,20 @@ pipe_air_gap_receiver_port=9780
# Datatype: double
pipe_all_sinks_rate_limit_bytes_per_second=-1

# When an IoTConsensusV2 consensus pipe is in stream mode, table-model insertion events whose
# measurement count reaches this threshold will be downgraded to TsFile replication.
# Values less than or equal to 0 disable this measurement-count trigger.
# effectiveMode: restart
# Datatype: int
pipe_realtime_iot_consensus_v2_force_tsfile_measurement_count_threshold=512

# When an IoTConsensusV2 consensus pipe is in stream mode, table-model insertion events whose
# estimated memory reaches this threshold will be downgraded to TsFile replication.
# Values less than or equal to 0 disable this memory trigger.
# effectiveMode: restart
# Datatype: long
pipe_realtime_iot_consensus_v2_force_tsfile_memory_threshold_in_bytes=15728640

####################
### Subscription Consensus Configuration
####################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public final class PipeMessages {
"PipeRealtimeForceDowngradingEnabled: {}";
public static final String CONFIG_PIPE_REALTIME_FORCE_DOWNGRADING_PROPORTION =
"PipeRealtimeForceDowngradingProportion: {}";
public static final String
CONFIG_PIPE_REALTIME_IOT_CONSENSUS_V2_FORCE_TSFILE_MEASUREMENT_COUNT_THRESHOLD =
"PipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold: {}";
public static final String
CONFIG_PIPE_REALTIME_IOT_CONSENSUS_V2_FORCE_TSFILE_MEMORY_THRESHOLD_IN_BYTES =
"PipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes: {}";
public static final String CONFIG_PIPE_SUBTASK_EXECUTOR_MAX_THREAD_NUM =
"PipeSubtaskExecutorMaxThreadNum: {}";
public static final String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public final class PipeMessages {
"PipeRealtimeForceDowngradingEnabled: {}";
public static final String CONFIG_PIPE_REALTIME_FORCE_DOWNGRADING_PROPORTION =
"PipeRealtimeForceDowngradingProportion: {}";
public static final String
CONFIG_PIPE_REALTIME_IOT_CONSENSUS_V2_FORCE_TSFILE_MEASUREMENT_COUNT_THRESHOLD =
"PipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold: {}";
public static final String
CONFIG_PIPE_REALTIME_IOT_CONSENSUS_V2_FORCE_TSFILE_MEMORY_THRESHOLD_IN_BYTES =
"PipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes: {}";
public static final String CONFIG_PIPE_SUBTASK_EXECUTOR_MAX_THREAD_NUM =
"PipeSubtaskExecutorMaxThreadNum: {}";
public static final String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ public class CommonConfig {
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
private boolean pipeRealtimeForceDowngradingEnabled = true;
private double pipeRealtimeForceDowngradingProportion = 0.25d;
private int pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold = 512;
private long pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes = 15 * MB;

/** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
Expand Down Expand Up @@ -1683,6 +1685,40 @@ public void setPipeRealtimeForceDowngradingProportion(
pipeRealtimeForceDowngradingProportion);
}

public int getPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold() {
return pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold;
}

public void setPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold(
int pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold) {
if (this.pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold
== pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold) {
return;
}
this.pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold =
pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold;
logger.info(
"pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold is set to {}.",
pipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold);
}

public long getPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes() {
return pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes;
}

public void setPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes(
long pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes) {
if (this.pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes
== pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes) {
return;
}
this.pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes =
pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes;
logger.info(
"pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes is set to {}.",
pipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes);
}

public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ public double getPipeRealtimeForceDowngradingProportion() {
return COMMON_CONFIG.getPipeRealtimeForceDowngradingProportion();
}

public int getPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold() {
return COMMON_CONFIG.getPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold();
}

public long getPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes() {
return COMMON_CONFIG.getPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes();
}

/////////////////////////////// Subtask Executor ///////////////////////////////

public int getPipeSubtaskExecutorMaxThreadNum() {
Expand Down Expand Up @@ -519,6 +527,12 @@ public void printAllConfigs() {
LOGGER.info(
PipeMessages.CONFIG_PIPE_REALTIME_FORCE_DOWNGRADING_PROPORTION,
getPipeRealtimeForceDowngradingProportion());
LOGGER.info(
PipeMessages.CONFIG_PIPE_REALTIME_IOT_CONSENSUS_V2_FORCE_TSFILE_MEASUREMENT_COUNT_THRESHOLD,
getPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold());
LOGGER.info(
PipeMessages.CONFIG_PIPE_REALTIME_IOT_CONSENSUS_V2_FORCE_TSFILE_MEMORY_THRESHOLD_IN_BYTES,
getPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes());

LOGGER.info(
PipeMessages.CONFIG_PIPE_SUBTASK_EXECUTOR_MAX_THREAD_NUM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
properties.getProperty(
"pipe_realtime_force_downgrading_proportion",
String.valueOf(config.getPipeRealtimeForceDowngradingProportion()))));
config.setPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold(
Integer.parseInt(
properties.getProperty(
"pipe_realtime_iot_consensus_v2_force_tsfile_measurement_count_threshold",
String.valueOf(
config.getPipeRealtimeIotConsensusV2ForceTsFileMeasurementCountThreshold()))));
config.setPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes(
Long.parseLong(
properties.getProperty(
"pipe_realtime_iot_consensus_v2_force_tsfile_memory_threshold_in_bytes",
String.valueOf(
config.getPipeRealtimeIotConsensusV2ForceTsFileMemoryThresholdInBytes()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(
Expand Down
Loading