From 7fc307bd41af07dd225db971cec0ddcb88b60281 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 17 Jun 2026 11:23:15 +0800 Subject: [PATCH] Fix tree schema snapshot database creation on master (#17964) (cherry picked from commit dd3153e4038aabfe7e07bfe2e50fc4209b472f10) --- .../thrift/IoTDBDataNodeReceiver.java | 61 +++++++++++++++++-- .../thrift/IoTDBDataNodeReceiverTest.java | 10 +++ 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 8a5c4ad060fa6..a837126cfa40e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -83,6 +83,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; @@ -536,17 +537,28 @@ static LoadTsFileStatement buildLoadTsFileStatementForSync( private TSStatus loadSchemaSnapShot( final Map parameters, final List fileAbsolutePaths) throws IllegalPathException, IOException { + final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE); + final PartialPath databasePath = new PartialPath(databaseName); + final PipePattern pipePattern = + PipePattern.parsePatternFromString( + parameters.get(ColumnHeaderConstant.PATH_PATTERN), IoTDBPipePattern::new); + + if (!shouldLoadTreeSchemaSnapshotDatabase(pipePattern, databaseName)) { + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + final TSStatus createDatabaseStatus = createSchemaSnapshotDatabaseIfNecessary(databasePath); + if (createDatabaseStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return createDatabaseStatus; + } + final SRStatementGenerator generator = SchemaRegionSnapshotParser.translate2Statements( Paths.get(fileAbsolutePaths.get(0)), fileAbsolutePaths.size() > 1 ? Paths.get(fileAbsolutePaths.get(1)) : null, - new PartialPath(parameters.get(ColumnHeaderConstant.DATABASE))); + databasePath); final Set executionTypes = PipeSchemaRegionSnapshotEvent.getStatementTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); - final PipePattern pipePattern = - PipePattern.parsePatternFromString( - parameters.get(ColumnHeaderConstant.PATH_PATTERN), IoTDBPipePattern::new); // Clear to avoid previous exceptions batchVisitor.clear(); @@ -571,6 +583,47 @@ private TSStatus loadSchemaSnapShot( return PipeReceiverStatusHandler.getPriorStatus(results); } + static boolean shouldLoadTreeSchemaSnapshotDatabase( + final String pathPattern, + final boolean isTreeModelDataAllowedToBeCaptured, + final String databaseName) { + return isTreeModelDataAllowedToBeCaptured + && shouldLoadTreeSchemaSnapshotDatabase( + PipePattern.parsePatternFromString(pathPattern, IoTDBPipePattern::new), databaseName); + } + + private static boolean shouldLoadTreeSchemaSnapshotDatabase( + final PipePattern pipePattern, final String databaseName) { + return pipePattern.mayOverlapWithDb(databaseName); + } + + private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath databasePath) { + final DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(databasePath); + + final TSStatus status = executeStatementAndClassifyExceptions(statement); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + + if (status.getCode() == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + return Objects.equals( + status.getMessage(), + databasePath.getFullPath() + " has already been created as database") + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + return status; + } + private TPipeTransferResp handleTransferSchemaPlan(final PipeTransferPlanNodeReq req) { // We may be able to skip the alter logical view's exception parsing because // the "AlterLogicalViewNode" is itself idempotent diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java index 1e279a18febfb..7f9197dfa04e7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -110,6 +110,16 @@ public void testRepeatedStatementExceptionLogIsReduced() throws Exception { } } + @Test + public void testTreeSchemaSnapshotDatabaseIsFilteredByPattern() { + Assert.assertTrue( + IoTDBDataNodeReceiver.shouldLoadTreeSchemaSnapshotDatabase("root.ln.**", true, "root.ln")); + Assert.assertFalse( + IoTDBDataNodeReceiver.shouldLoadTreeSchemaSnapshotDatabase("root.ln.**", true, "root.db")); + Assert.assertFalse( + IoTDBDataNodeReceiver.shouldLoadTreeSchemaSnapshotDatabase("root.ln.**", false, "root.ln")); + } + @Test public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() throws Exception { final Path tsFile = Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");