diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 33c49bc8ca0..d4c60aa7420 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -641,7 +641,12 @@ public void start() throws Exception { if (memoryUsage != null) { memoryUsage.start(); } - + if (systemUsage.getStoreUsage() != null) { + systemUsage.getStoreUsage().start(); + } + if (systemUsage.getTempUsage() != null) { + systemUsage.getTempUsage().start(); + } if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); } @@ -658,6 +663,12 @@ public void stop() throws Exception { if (memoryUsage != null) { memoryUsage.stop(); } + if (systemUsage.getStoreUsage() != null) { + systemUsage.getStoreUsage().stop(); + } + if (this.systemUsage.getTempUsage() != null) { + this.systemUsage.getTempUsage().stop(); + } if (this.topicStore != null) { this.topicStore.stop(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java index f075187a515..9216e84b46d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java @@ -381,6 +381,10 @@ protected void removeChild(T child) { children.remove(child); } + protected List getChildren() { + return children; + } + /** * @param callback * @return true if the UsageManager was full. The callback will only be called if this method returns true. diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java index 70daba92baa..4cfe3d52b69 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java @@ -17,10 +17,15 @@ package org.apache.activemq.usage; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.ConfigurationException; +import org.apache.activemq.broker.region.BaseDestination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; import org.apache.activemq.broker.BrokerService; @@ -32,6 +37,7 @@ import org.apache.logging.log4j.core.filter.AbstractFilter; import org.apache.logging.log4j.core.layout.MessageLayout; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.junit.experimental.categories.Category; @@ -114,4 +120,67 @@ public void append(LogEvent event) { logger.removeAppender(appender); } + + @Test + public void testUsageLimitUpdatesAppliedToDests() throws Exception { + BrokerService brokerService = createBroker(); + try { + // prevent any other dests from being created + brokerService.setAdvisorySupport(false); + final SystemUsage systemUsage = brokerService.getSystemUsage(); + final StoreUsage storeUsage = systemUsage.getStoreUsage(); + final TempUsage tempUsage = systemUsage.getTempUsage(); + storeUsage.setLimit(4096); + tempUsage.setLimit(1024); + brokerService.start(); + brokerService.waitUntilStarted(); + + // On startup there will be no destinations created so storeUsage and + // systemUsage for the broker should have no children + assertTrue(storeUsage.getChildren().isEmpty()); + assertTrue(tempUsage.getChildren().isEmpty()); + + // Create a topic and a queue. This should cause both destinations to call start() + // and create child usage trackers that will get registered with the parent + ActiveMQTopic topic = new ActiveMQTopic("test.topic"); + ActiveMQQueue queue = new ActiveMQQueue("test.queue"); + brokerService.setDestinations(new ActiveMQDestination[]{topic, queue}); + List dests = List.of((BaseDestination) brokerService.getDestination(topic), + (BaseDestination) brokerService.getDestination(queue)); + + // verify both broker usage trackers have 2 children now + assertEquals(2, storeUsage.getChildren().size()); + assertEquals(2, tempUsage.getChildren().size()); + + // the limits for each destination should match the parent + for (BaseDestination dest : dests) { + assertEquals(4096, dest.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(1024, dest.getSystemUsage().getTempUsage().getLimit()); + } + + // Update the parent, this will trigger a callback to update to tell any + // registered children usage trackers to update their limit + storeUsage.setLimit(8192); + tempUsage.setLimit(2048); + + // Verify the children trackers on each dest were correctly updated + for (BaseDestination dest : dests) { + assertEquals(8192, dest.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(2048, dest.getSystemUsage().getTempUsage().getLimit()); + } + + // Remove each dest and verify that after removal the stop() method properly + // unregisters the tracker from the parent so we prevent leaks + brokerService.removeDestination(topic); + assertEquals(1, storeUsage.getChildren().size()); + assertEquals(1, tempUsage.getChildren().size()); + + brokerService.removeDestination(queue); + assertTrue(storeUsage.getChildren().isEmpty()); + assertTrue(tempUsage.getChildren().isEmpty()); + } finally { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } }