diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java index b467b4733476a..9ef0fbf0f30f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java @@ -76,7 +76,8 @@ public Scheduler( this.releaseFlushStrategy = releaseFlushStrategy; } - private void executeFlush(CachedMTreeStore store, int regionId, AtomicInteger remainToFlush) { + private void executeFlush( + CachedMTreeStore store, int regionId, AtomicInteger remainToFlush, boolean propagateFailure) { IMemoryManager memoryManager = store.getMemoryManager(); ISchemaFile file = store.getSchemaFile(); LockManager lockManager = store.getLockManager(); @@ -97,6 +98,9 @@ private void executeFlush(CachedMTreeStore store, int regionId, AtomicInteger re regionId, e.getMessage(), e); + if (propagateFailure) { + throw new RuntimeException(e); + } } finally { long time = System.currentTimeMillis() - startTime; if (time > 10_000) { @@ -145,22 +149,26 @@ public synchronized CompletableFuture scheduleFlushAll() { CompletableFuture.runAsync( () -> { int regionId = entry.getKey(); - CachedMTreeStore store = entry.getValue(); - if (store == null) { - // store has been closed - return; - } - LockManager lockManager = store.getLockManager(); - lockManager.globalReadLock(); - if (!regionToStore.containsKey(regionId)) { - // double check store have not been closed - return; - } try { - executeFlush(store, regionId, null); - executeRelease(store, false); + CachedMTreeStore store = entry.getValue(); + if (store == null) { + // store has been closed + return; + } + LockManager lockManager = store.getLockManager(); + lockManager.globalReadLock(); + try { + if (!regionToStore.containsKey(regionId)) { + // double check store have not been closed + return; + } + executeFlush(store, regionId, null, true); + executeRelease(store, false); + } finally { + lockManager.globalReadUnlock(); + } } finally { - lockManager.globalReadUnlock(); + flushingRegionSet.remove(regionId); } }, workerPool)) @@ -221,22 +229,25 @@ public synchronized void scheduleFlush(List regionIds) { flushingRegionSet.add(regionId); workerPool.submit( () -> { - CachedMTreeStore store = regionToStore.get(regionId); - if (store == null) { - // store has been closed - return; - } - LockManager lockManager = store.getLockManager(); - lockManager.globalReadLock(); - if (!regionToStore.containsKey(regionId)) { - // double check store have not been closed - return; - } try { - - executeFlush(store, regionId, remainToFlush); + CachedMTreeStore store = regionToStore.get(regionId); + if (store == null) { + // store has been closed + return; + } + LockManager lockManager = store.getLockManager(); + lockManager.globalReadLock(); + try { + if (!regionToStore.containsKey(regionId)) { + // double check store have not been closed + return; + } + executeFlush(store, regionId, remainToFlush, false); + } finally { + lockManager.globalReadUnlock(); + } } finally { - lockManager.globalReadUnlock(); + flushingRegionSet.remove(regionId); } }); if (remainToFlush.get() <= 0) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java index 06d8c279844dd..56e0b5a6dc20a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java @@ -292,8 +292,6 @@ private void writeUpdatedChildren(ICachedMNode node, SchemaPageContext cxt) .entrySet()) { child = entry.getValue(); actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt); - childBuffer = RecordUtils.node2Buffer(child); - curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt); if (curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress), entry.getKey()) == null) { @@ -302,6 +300,13 @@ private void writeUpdatedChildren(ICachedMNode node, SchemaPageContext cxt) "Node[%s] has no child[%s] in pbtree file.", node.getName(), entry.getKey())); } + if (!child.isMeasurement() && getNodeAddress(child) < 0) { + short estSegSize = estimateSegmentSize(child); + long glbIndex = preAllocateSegment(estSegSize, cxt); + SchemaFile.setNodeAddress(child, glbIndex); + } + childBuffer = RecordUtils.node2Buffer(child); + // prepare alias comparison if (child.isMeasurement() && child.getAsMeasurementMNode().getAlias() != null) { alias = child.getAsMeasurementMNode().getAlias(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java index 75a8ab8bc0000..bb25ea10092c6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java @@ -315,6 +315,39 @@ public void test2KMeasurement() throws MetadataException, IOException { sf.close(); } + @Test + public void testFlushUpdatedChildWithNegativeSegmentAddress() throws Exception { + ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", TEST_SCHEMA_REGION_ID); + ICachedMNode sgNode = nodeFactory.createDatabaseDeviceMNode(null, "sg").getAsMNode(); + ICachedMNode device = nodeFactory.createDeviceMNode(sgNode, "d1").getAsMNode(); + sgNode.addChild(device); + + writeMNodeInTest(sf, sgNode); + + // Typical flush order: the parent already has this child record on disk, while the updated + // child object in memory may still have no valid segment address. + ICachedMNodeContainer.getCachedMNodeContainer(device).setSegmentAddress(-1L); + addNodeToUpdateBuffer(sgNode, device); + writeMNodeInTest(sf, sgNode); + + Assert.assertTrue(getSegAddrInContainer(device) >= 0); + + device.addChild(getMeasurementNode(device, "s1", "alias_s1")); + writeMNodeInTest(sf, device); + Assert.assertEquals( + "alias_s1", sf.getChildNode(device, "s1").getAsMeasurementMNode().getAlias()); + + long deviceSegmentAddress = getSegAddrInContainer(device); + sf.close(); + + sf = SchemaFile.loadSchemaFile("root.sg", TEST_SCHEMA_REGION_ID); + ICachedMNode loadedDevice = sf.getChildNode(sgNode, "d1"); + Assert.assertEquals(deviceSegmentAddress, getSegAddrInContainer(loadedDevice)); + Assert.assertEquals( + "alias_s1", sf.getChildNode(loadedDevice, "s1").getAsMeasurementMNode().getAlias()); + sf.close(); + } + @Test public void testMassiveSegment() throws MetadataException, IOException { ICachedMNode dbNode = nodeFactory.createDatabaseDeviceMNode(null, "sgRoot");