From a2791d2a89e9f95b3f383a934f8ddea6c1ebe44a Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Thu, 2 Jul 2026 15:38:55 -0400 Subject: [PATCH] Fix Topic store/temp usage tracking This fixes a bug that prevented broker usage limit updates from being applied to topics. Previousy topics did not call start() on the usage trackers for the store or temp store which means those trackers didn't get registered with the parent tracker. Registering with the parent is required in order to get notified of updates. Queues already properly called start/stop to register and deregister the usage trackers so this commit just brings Topics in line with Queues and adds a test to verify the trackers now correctly register themselves with the broker's parent tracker and the limits are applied on updates. Lastly, the the test verifies the trackers are cleaned up correctly on destination deletion. --- .../apache/activemq/broker/region/Topic.java | 13 +++- .../java/org/apache/activemq/usage/Usage.java | 4 ++ .../activemq/usage/StoreUsageLimitsTest.java | 69 +++++++++++++++++++ 3 files changed, 85 insertions(+), 1 deletion(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 33c49bc8ca0..d4c60aa7420 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -641,7 +641,12 @@ public void start() throws Exception { if (memoryUsage != null) { memoryUsage.start(); } - + if (systemUsage.getStoreUsage() != null) { + systemUsage.getStoreUsage().start(); + } + if (systemUsage.getTempUsage() != null) { + systemUsage.getTempUsage().start(); + } if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); } @@ -658,6 +663,12 @@ public void stop() throws Exception { if (memoryUsage != null) { memoryUsage.stop(); } + if (systemUsage.getStoreUsage() != null) { + systemUsage.getStoreUsage().stop(); + } + if (this.systemUsage.getTempUsage() != null) { + this.systemUsage.getTempUsage().stop(); + } if (this.topicStore != null) { this.topicStore.stop(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java index f075187a515..9216e84b46d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java @@ -381,6 +381,10 @@ protected void removeChild(T child) { children.remove(child); } + protected List getChildren() { + return children; + } + /** * @param callback * @return true if the UsageManager was full. The callback will only be called if this method returns true. diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java index 70daba92baa..4cfe3d52b69 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java @@ -17,10 +17,15 @@ package org.apache.activemq.usage; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.ConfigurationException; +import org.apache.activemq.broker.region.BaseDestination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; import org.apache.activemq.broker.BrokerService; @@ -32,6 +37,7 @@ import org.apache.logging.log4j.core.filter.AbstractFilter; import org.apache.logging.log4j.core.layout.MessageLayout; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.junit.experimental.categories.Category; @@ -114,4 +120,67 @@ public void append(LogEvent event) { logger.removeAppender(appender); } + + @Test + public void testUsageLimitUpdatesAppliedToDests() throws Exception { + BrokerService brokerService = createBroker(); + try { + // prevent any other dests from being created + brokerService.setAdvisorySupport(false); + final SystemUsage systemUsage = brokerService.getSystemUsage(); + final StoreUsage storeUsage = systemUsage.getStoreUsage(); + final TempUsage tempUsage = systemUsage.getTempUsage(); + storeUsage.setLimit(4096); + tempUsage.setLimit(1024); + brokerService.start(); + brokerService.waitUntilStarted(); + + // On startup there will be no destinations created so storeUsage and + // systemUsage for the broker should have no children + assertTrue(storeUsage.getChildren().isEmpty()); + assertTrue(tempUsage.getChildren().isEmpty()); + + // Create a topic and a queue. This should cause both destinations to call start() + // and create child usage trackers that will get registered with the parent + ActiveMQTopic topic = new ActiveMQTopic("test.topic"); + ActiveMQQueue queue = new ActiveMQQueue("test.queue"); + brokerService.setDestinations(new ActiveMQDestination[]{topic, queue}); + List dests = List.of((BaseDestination) brokerService.getDestination(topic), + (BaseDestination) brokerService.getDestination(queue)); + + // verify both broker usage trackers have 2 children now + assertEquals(2, storeUsage.getChildren().size()); + assertEquals(2, tempUsage.getChildren().size()); + + // the limits for each destination should match the parent + for (BaseDestination dest : dests) { + assertEquals(4096, dest.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(1024, dest.getSystemUsage().getTempUsage().getLimit()); + } + + // Update the parent, this will trigger a callback to update to tell any + // registered children usage trackers to update their limit + storeUsage.setLimit(8192); + tempUsage.setLimit(2048); + + // Verify the children trackers on each dest were correctly updated + for (BaseDestination dest : dests) { + assertEquals(8192, dest.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(2048, dest.getSystemUsage().getTempUsage().getLimit()); + } + + // Remove each dest and verify that after removal the stop() method properly + // unregisters the tracker from the parent so we prevent leaks + brokerService.removeDestination(topic); + assertEquals(1, storeUsage.getChildren().size()); + assertEquals(1, tempUsage.getChildren().size()); + + brokerService.removeDestination(queue); + assertTrue(storeUsage.getChildren().isEmpty()); + assertTrue(tempUsage.getChildren().isEmpty()); + } finally { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } }