File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -45,6 +45,11 @@ class RdKafkaContext implements Context
4545 */
4646 private $ kafkaConsumers ;
4747
48+ /**
49+ * @var RdKafkaConsumer[]
50+ */
51+ private $ rdKafkaConsumers ;
52+
4853 /**
4954 * @param array $config
5055 */
@@ -102,20 +107,26 @@ public function createConsumer(Destination $destination): Consumer
102107 {
103108 InvalidDestinationException::assertDestinationInstanceOf ($ destination , RdKafkaTopic::class);
104109
105- $ this -> kafkaConsumers [] = $ kafkaConsumer = new KafkaConsumer ( $ this -> getConf () );
110+ $ queueName = $ destination -> getQueueName ( );
106111
107- $ consumer = new RdKafkaConsumer (
108- $ kafkaConsumer ,
109- $ this ,
110- $ destination ,
111- $ this ->getSerializer ()
112- );
112+ if (!isset ($ this ->rdKafkaConsumers [$ queueName ])) {
113+ $ this ->kafkaConsumers [] = $ kafkaConsumer = new KafkaConsumer ($ this ->getConf ());
114+
115+ $ consumer = new RdKafkaConsumer (
116+ $ kafkaConsumer ,
117+ $ this ,
118+ $ destination ,
119+ $ this ->getSerializer ()
120+ );
121+
122+ if (isset ($ this ->config ['commit_async ' ])) {
123+ $ consumer ->setCommitAsync ($ this ->config ['commit_async ' ]);
124+ }
113125
114- if (isset ($ this ->config ['commit_async ' ])) {
115- $ consumer ->setCommitAsync ($ this ->config ['commit_async ' ]);
126+ $ this ->rdKafkaConsumers [$ queueName ] = $ consumer ;
116127 }
117128
118- return $ consumer ;
129+ return $ this -> rdKafkaConsumers [ $ queueName ] ;
119130 }
120131
121132 public function close (): void
Original file line number Diff line number Diff line change @@ -69,4 +69,27 @@ public function testShouldInjectItsSerializerToConsumer()
6969
7070 $ this ->assertSame ($ context ->getSerializer (), $ producer ->getSerializer ());
7171 }
72+
73+ public function testShouldNotCreateConsumerTwice ()
74+ {
75+ $ context = new RdKafkaContext ([]);
76+ $ queue = $ context ->createQueue ('aQueue ' );
77+
78+ $ consumer = $ context ->createConsumer ($ queue );
79+ $ consumer2 = $ context ->createConsumer ($ queue );
80+
81+ $ this ->assertSame ($ consumer , $ consumer2 );
82+ }
83+
84+ public function testShouldCreateTwoConsumers ()
85+ {
86+ $ context = new RdKafkaContext ([]);
87+ $ queueA = $ context ->createQueue ('aQueue ' );
88+ $ queueB = $ context ->createQueue ('aQueue ' );
89+
90+ $ consumer = $ context ->createConsumer ($ queueA );
91+ $ consumer2 = $ context ->createConsumer ($ queueB );
92+
93+ $ this ->assertNotSame ($ consumer , $ consumer2 );
94+ }
7295}
You can’t perform that action at this time.
0 commit comments