Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,12 @@ public void start() throws Exception {
if (memoryUsage != null) {
memoryUsage.start();
}

if (systemUsage.getStoreUsage() != null) {
systemUsage.getStoreUsage().start();
}
if (systemUsage.getTempUsage() != null) {
systemUsage.getTempUsage().start();
}
if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
Expand All @@ -658,6 +663,12 @@ public void stop() throws Exception {
if (memoryUsage != null) {
memoryUsage.stop();
}
if (systemUsage.getStoreUsage() != null) {
systemUsage.getStoreUsage().stop();
}
if (this.systemUsage.getTempUsage() != null) {
this.systemUsage.getTempUsage().stop();
}
if (this.topicStore != null) {
this.topicStore.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ protected void removeChild(T child) {
children.remove(child);
}

protected List<T> getChildren() {
return children;
}

/**
* @param callback
* @return true if the UsageManager was full. The callback will only be called if this method returns true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.activemq.usage;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.ConfigurationException;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;

import org.apache.activemq.broker.BrokerService;
Expand All @@ -32,6 +37,7 @@
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -114,4 +120,67 @@ public void append(LogEvent event) {

logger.removeAppender(appender);
}

@Test
public void testUsageLimitUpdatesAppliedToDests() throws Exception {
BrokerService brokerService = createBroker();
try {
// prevent any other dests from being created
brokerService.setAdvisorySupport(false);
final SystemUsage systemUsage = brokerService.getSystemUsage();
final StoreUsage storeUsage = systemUsage.getStoreUsage();
final TempUsage tempUsage = systemUsage.getTempUsage();
storeUsage.setLimit(4096);
tempUsage.setLimit(1024);
brokerService.start();
brokerService.waitUntilStarted();

// On startup there will be no destinations created so storeUsage and
// systemUsage for the broker should have no children
assertTrue(storeUsage.getChildren().isEmpty());
assertTrue(tempUsage.getChildren().isEmpty());

// Create a topic and a queue. This should cause both destinations to call start()
// and create child usage trackers that will get registered with the parent
ActiveMQTopic topic = new ActiveMQTopic("test.topic");
ActiveMQQueue queue = new ActiveMQQueue("test.queue");
brokerService.setDestinations(new ActiveMQDestination[]{topic, queue});
List<BaseDestination> dests = List.of((BaseDestination) brokerService.getDestination(topic),
(BaseDestination) brokerService.getDestination(queue));

// verify both broker usage trackers have 2 children now
assertEquals(2, storeUsage.getChildren().size());
assertEquals(2, tempUsage.getChildren().size());

// the limits for each destination should match the parent
for (BaseDestination dest : dests) {
assertEquals(4096, dest.getSystemUsage().getStoreUsage().getLimit());
assertEquals(1024, dest.getSystemUsage().getTempUsage().getLimit());
}

// Update the parent, this will trigger a callback to update to tell any
// registered children usage trackers to update their limit
storeUsage.setLimit(8192);
tempUsage.setLimit(2048);

// Verify the children trackers on each dest were correctly updated
for (BaseDestination dest : dests) {
assertEquals(8192, dest.getSystemUsage().getStoreUsage().getLimit());
assertEquals(2048, dest.getSystemUsage().getTempUsage().getLimit());
}

// Remove each dest and verify that after removal the stop() method properly
// unregisters the tracker from the parent so we prevent leaks
brokerService.removeDestination(topic);
assertEquals(1, storeUsage.getChildren().size());
assertEquals(1, tempUsage.getChildren().size());

brokerService.removeDestination(queue);
assertTrue(storeUsage.getChildren().isEmpty());
assertTrue(tempUsage.getChildren().isEmpty());
} finally {
brokerService.stop();
brokerService.waitUntilStopped();
}
}
}