@@ -255,11 +255,29 @@ private void initializeQueue() {
255255 defaultTaskExecutor = true ;
256256 taskExecutor = createDefaultTaskExecutor (queueDetails );
257257 } else {
258- initializeThreadMap (queueDetails , taskExecutor , false , queueDetails . size () );
258+ initializeThreadMapForNonDefaultExecutor (queueDetails );
259259 }
260260 initializeRunningQueueState ();
261261 }
262262
263+ private void initializeThreadMapForNonDefaultExecutor (
264+ List <QueueDetail > registeredActiveQueueDetail ) {
265+ List <QueueDetail > queueDetails =
266+ registeredActiveQueueDetail .stream ()
267+ .filter (e -> !e .isSystemGenerated ())
268+ .collect (Collectors .toList ());
269+ List <QueueDetail > withoutConcurrency = new ArrayList <>();
270+ for (QueueDetail queueDetail : queueDetails ) {
271+ if (queueDetail .getConcurrency ().isValid ()) {
272+ addExecutorForConcurrencyBasedQueue (queueDetail , taskExecutor , false );
273+ } else {
274+ withoutConcurrency .add (queueDetail );
275+ }
276+ }
277+ initializeThreadMap (
278+ withoutConcurrency , taskExecutor , false , getWorkersCount (withoutConcurrency .size ()));
279+ }
280+
263281 private void initialize () {
264282 initializeQueue ();
265283 this .postProcessingHandler =
@@ -296,9 +314,12 @@ private void initializeThreadMap(
296314 AsyncTaskExecutor taskExecutor ,
297315 boolean defaultExecutor ,
298316 int workersCount ) {
317+ if (queueDetails .isEmpty ()) {
318+ return ;
319+ }
320+ QueueThreadPool pool = new QueueThreadPool (taskExecutor , defaultExecutor , workersCount );
299321 for (QueueDetail queueDetail : queueDetails ) {
300- queueThreadMap .put (
301- queueDetail .getName (), new QueueThreadPool (taskExecutor , defaultExecutor , workersCount ));
322+ queueThreadMap .put (queueDetail .getName (), pool );
302323 }
303324 }
304325
@@ -332,16 +353,19 @@ private AsyncTaskExecutor createNonConcurrencyBasedExecutor(
332353 return executor ;
333354 }
334355
356+ private void addExecutorForConcurrencyBasedQueue (
357+ QueueDetail queueDetail , AsyncTaskExecutor executor , boolean defaultTaskExecutor ) {
358+ int maxJobs = queueDetail .getConcurrency ().getMax ();
359+ QueueThreadPool threadPool = new QueueThreadPool (executor , defaultTaskExecutor , maxJobs );
360+ queueThreadMap .put (queueDetail .getName (), threadPool );
361+ }
362+
335363 private void createExecutor (QueueDetail queueDetail ) {
336364 Concurrency concurrency = queueDetail .getConcurrency ();
337- int queueCapacity = 0 ;
338- int maxJobs = concurrency .getMax ();
339365 int corePoolSize = concurrency .getMin ();
340366 int maxPoolSize = concurrency .getMax ();
341- AsyncTaskExecutor executor =
342- createTaskExecutor (queueDetail , corePoolSize , maxPoolSize , queueCapacity );
343- QueueThreadPool threadPool = new QueueThreadPool (executor , true , maxJobs );
344- queueThreadMap .put (queueDetail .getName (), threadPool );
367+ AsyncTaskExecutor executor = createTaskExecutor (queueDetail , corePoolSize , maxPoolSize );
368+ addExecutorForConcurrencyBasedQueue (queueDetail , executor , true );
345369 }
346370
347371 public AsyncTaskExecutor createDefaultTaskExecutor (
@@ -362,15 +386,14 @@ public AsyncTaskExecutor createDefaultTaskExecutor(
362386 }
363387
364388 private AsyncTaskExecutor createTaskExecutor (
365- QueueDetail queueDetail , int corePoolSize , int maxPoolSize , int queueCapacity ) {
389+ QueueDetail queueDetail , int corePoolSize , int maxPoolSize ) {
366390 String name = ThreadUtils .getWorkerName (queueDetail .getName ());
367- return ThreadUtils .createTaskExecutor (
368- name , name + "-" , corePoolSize , maxPoolSize , queueCapacity );
391+ return ThreadUtils .createTaskExecutor (name , name + "-" , corePoolSize , maxPoolSize , 0 );
369392 }
370393
371394 private List <QueueDetail > getQueueDetail (String queue , MappingInformation mappingInformation ) {
372395 int numRetry = mappingInformation .getNumRetry ();
373- if (!mappingInformation .getDeadLetterQueueName (). isEmpty ( ) && numRetry == -1 ) {
396+ if (!StringUtils . isEmpty ( mappingInformation .getDeadLetterQueueName ()) && numRetry == -1 ) {
374397 log .warn (
375398 "Dead letter queue {} is set but retry is not set" ,
376399 mappingInformation .getDeadLetterQueueName ());
@@ -403,12 +426,13 @@ private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappin
403426 .priority (priority )
404427 .priorityGroup (priorityGroup )
405428 .build ();
429+ List <QueueDetail > queueDetails ;
406430 if (queueDetail .getPriority ().size () <= 1 ) {
407- return Collections .singletonList (queueDetail );
431+ queueDetails = Collections .singletonList (queueDetail );
432+ } else {
433+ queueDetails = queueDetail .expandQueueDetail (true , -1 );
408434 }
409- return queueDetail .expandQueueDetail (
410- rqueueConfig .isAddDefaultQueueWithQueueLevelPriority (),
411- rqueueConfig .getDefaultQueueWithQueueLevelPriority ());
435+ return queueDetails ;
412436 }
413437
414438 @ Override
@@ -450,6 +474,7 @@ protected void doStart() {
450474
451475 private Map <String , QueueThreadPool > getQueueThreadMap (
452476 String groupName , List <QueueDetail > queueDetails ) {
477+ // this happens only for queue having priorities like critical:10,high:5,low:3
453478 QueueThreadPool queueThreadPool = queueThreadMap .get (groupName );
454479 if (queueThreadPool != null ) {
455480 return queueDetails .stream ()
0 commit comments