Skip to content

Commit 92d96e4

Browse files
committed
added support of balanced requests (call method)
1 parent fe2f16e commit 92d96e4

3 files changed

Lines changed: 24 additions & 4 deletions

File tree

moleculer/node.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,16 @@ def subscribe_to_topics(self):
132132
self._channel.basic_consume(self.consumer.request, self.moleculer_topics.queues['REQUEST'])
133133
self._channel.basic_consume(self.consumer.response, self.moleculer_topics.queues['RESPONSE'])
134134
self._channel.basic_consume(self.consumer.event, self.moleculer_topics.queues['EVENT'])
135+
for queue_name in self.moleculer_topics.action_queues:
136+
self._channel.basic_consume(self.consumer.request, queue_name)
135137
self._connection.add_timeout(0.5, self.discover_packet)
136138
else:
137139
self._connection.add_timeout(0.1, self.subscribe_to_topics)
138140

139141
def create_topics(self):
140142
queues = self.moleculer_topics.queues.items()
141-
self.expect_topics_count = len(queues) + len(MOLECULER_EXCHANGES)
143+
action_queues = self.moleculer_topics.action_queues
144+
self.expect_topics_count = len(queues) + len(MOLECULER_EXCHANGES) + len(action_queues)
142145

143146
for queue_type, queue_name in queues:
144147
if queue_type in ('REQUEST', 'RESPONSE'):
@@ -147,6 +150,10 @@ def create_topics(self):
147150
self.setup_queue(queue_name, ttl=True, exclusive=True)
148151
else:
149152
self.setup_queue(queue_name, ttl=True, exclusive=False)
153+
154+
for queue_name in action_queues:
155+
self.setup_queue(queue_name, ttl=False, exclusive=False, durable=True)
156+
150157
for exchange_type, exchange_name in MOLECULER_EXCHANGES.items():
151158
self.setup_exchange(exchange_name)
152159

@@ -227,7 +234,7 @@ def on_exchange_declareok(self, unused_frame):
227234
LOGGER.info('Exchange declared')
228235
self.ready_topics.append(None)
229236

230-
def setup_queue(self, queue_name, ttl=True, exclusive=False):
237+
def setup_queue(self, queue_name, ttl=True, exclusive=False, durable=False):
231238
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
232239
command. When it is complete, the on_queue_declareok method will
233240
be invoked by pika.
@@ -241,7 +248,8 @@ def setup_queue(self, queue_name, ttl=True, exclusive=False):
241248
arguments = {}
242249
if ttl:
243250
arguments['x-message-ttl'] = 5000 # eventTimeToLive: https://github.com/ice-services/moleculer/pull/72
244-
self._channel.queue_declare(self.on_queue_declareok, queue_name, exclusive=exclusive, arguments=arguments)
251+
self._channel.queue_declare(self.on_queue_declareok, queue_name,
252+
exclusive=exclusive, durable=durable, arguments=arguments)
245253

246254
def on_queue_declareok(self, method_frame):
247255
"""Method invoked by pika when the Queue.Declare RPC call made in

moleculer/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def event_handler(sender_node_id: str, event: str, payload: dict):
3535
'ver': '2',
3636
'sender': None,
3737
'services': [{
38-
'actions': {'$python.test': service_builder('$python.test')},
38+
'actions': {'python_test': service_builder('python_test')},
3939
'events': {},
4040
'metadata': {},
4141
'name': '$python',

moleculer/topics.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from .service import INFO_PACKET_TEMPLATE
2+
3+
14
EXCHANGES = {
25
'DISCOVER': 'MOL.DISCOVER',
36
'INFO': 'MOL.INFO',
@@ -40,3 +43,12 @@ def bindings(self):
4043
if queue_type in EXCHANGES:
4144
result[queue_name] = EXCHANGES[queue_type]
4245
return result
46+
47+
@property
48+
def action_queues(self):
49+
result = []
50+
for service in INFO_PACKET_TEMPLATE['services']:
51+
service_name = service['name']
52+
for action in service['actions'].keys():
53+
result.append('MOL.REQB.{service_name}.{action}'.format(service_name=service_name, action=action))
54+
return result

0 commit comments

Comments
 (0)