@@ -45,13 +45,19 @@ class RdKafkaContext implements Context
4545 */
4646 private $ kafkaConsumers ;
4747
48+ /**
49+ * @var RdKafkaConsumer[]
50+ */
51+ private $ rdKafkaConsumers ;
52+
4853 /**
4954 * @param array $config
5055 */
5156 public function __construct (array $ config )
5257 {
5358 $ this ->config = $ config ;
5459 $ this ->kafkaConsumers = [];
60+ $ this ->rdKafkaConsumers = [];
5561
5662 $ this ->setSerializer (new JsonSerializer ());
5763 }
@@ -102,26 +108,33 @@ public function createConsumer(Destination $destination): Consumer
102108 {
103109 InvalidDestinationException::assertDestinationInstanceOf ($ destination , RdKafkaTopic::class);
104110
105- $ this -> kafkaConsumers [] = $ kafkaConsumer = new KafkaConsumer ( $ this -> getConf () );
111+ $ queueName = $ destination -> getQueueName ( );
106112
107- $ consumer = new RdKafkaConsumer (
108- $ kafkaConsumer ,
109- $ this ,
110- $ destination ,
111- $ this ->getSerializer ()
112- );
113+ if (!isset ($ this ->rdKafkaConsumers [$ queueName ])) {
114+ $ this ->kafkaConsumers [] = $ kafkaConsumer = new KafkaConsumer ($ this ->getConf ());
115+
116+ $ consumer = new RdKafkaConsumer (
117+ $ kafkaConsumer ,
118+ $ this ,
119+ $ destination ,
120+ $ this ->getSerializer ()
121+ );
122+
123+ if (isset ($ this ->config ['commit_async ' ])) {
124+ $ consumer ->setCommitAsync ($ this ->config ['commit_async ' ]);
125+ }
113126
114- if (isset ($ this ->config ['commit_async ' ])) {
115- $ consumer ->setCommitAsync ($ this ->config ['commit_async ' ]);
127+ $ this ->rdKafkaConsumers [$ queueName ] = $ consumer ;
116128 }
117129
118- return $ consumer ;
130+ return $ this -> rdKafkaConsumers [ $ queueName ] ;
119131 }
120132
121133 public function close (): void
122134 {
123135 $ kafkaConsumers = $ this ->kafkaConsumers ;
124136 $ this ->kafkaConsumers = [];
137+ $ this ->rdKafkaConsumers = [];
125138
126139 foreach ($ kafkaConsumers as $ kafkaConsumer ) {
127140 $ kafkaConsumer ->unsubscribe ();
0 commit comments