Skip to content

Commit d4251dd

Browse files
committed
added support for events, disabled prefetch
1 parent 92d96e4 commit d4251dd

4 files changed

Lines changed: 25 additions & 9 deletions

File tree

moleculer/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def request(self, channel, basic_deliver, properties, body):
5757
'sender': self.node_id,
5858
'id': request_id,
5959
'success': True,
60-
'data': {'result': 'Response from python node'}
60+
'data': {'result': 'Response from python node: ' + self.node_id}
6161
}
6262
sender_exchange = 'MOL.RES.{node_id}'.format(node_id=sender)
6363
channel.basic_publish('', sender_exchange, json.dumps(response_packet))

moleculer/node.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class MoleculerNode(object):
2929
"""
3030

3131
EXCHANGE_TYPE = 'fanout'
32-
NODE_ID = 'python-node-2'
32+
NODE_ID = 'python-node-1'
3333
HEARTBEAT_INTERVAL = 5
3434

3535
def __init__(self, amqp_url, node_id=None):
@@ -105,7 +105,7 @@ def open_channel(self):
105105
def on_channel_open(self, channel):
106106
LOGGER.info('Channel opened')
107107
self._channel: Channel = channel
108-
self._channel.basic_qos(prefetch_count=1)
108+
# self._channel.basic_qos(prefetch_count=1)
109109
# self._channel.confirm_delivery() # Enabled delivery confirmations
110110
self.add_on_channel_close_callback()
111111
self.create_topics()
@@ -132,27 +132,34 @@ def subscribe_to_topics(self):
132132
self._channel.basic_consume(self.consumer.request, self.moleculer_topics.queues['REQUEST'])
133133
self._channel.basic_consume(self.consumer.response, self.moleculer_topics.queues['RESPONSE'])
134134
self._channel.basic_consume(self.consumer.event, self.moleculer_topics.queues['EVENT'])
135+
135136
for queue_name in self.moleculer_topics.action_queues:
136137
self._channel.basic_consume(self.consumer.request, queue_name)
138+
for queue_name in self.moleculer_topics.event_queues:
139+
self._channel.basic_consume(self.consumer.event, queue_name, no_ack=True)
140+
137141
self._connection.add_timeout(0.5, self.discover_packet)
138142
else:
139143
self._connection.add_timeout(0.1, self.subscribe_to_topics)
140144

141145
def create_topics(self):
142146
queues = self.moleculer_topics.queues.items()
143-
action_queues = self.moleculer_topics.action_queues
144-
self.expect_topics_count = len(queues) + len(MOLECULER_EXCHANGES) + len(action_queues)
147+
action_queues, events_queues = self.moleculer_topics.action_queues, self.moleculer_topics.event_queues
148+
self.expect_topics_count = len(queues) + len(MOLECULER_EXCHANGES) + len(action_queues) + len(events_queues)
145149

146150
for queue_type, queue_name in queues:
147151
if queue_type in ('REQUEST', 'RESPONSE'):
148152
self.setup_queue(queue_name, ttl=False, exclusive=False)
149153
elif queue_type == 'HEARTBEAT':
150-
self.setup_queue(queue_name, ttl=True, exclusive=True)
154+
self.setup_queue(queue_name, ttl=False, exclusive=True)
151155
else:
152156
self.setup_queue(queue_name, ttl=True, exclusive=False)
153157

154158
for queue_name in action_queues:
155-
self.setup_queue(queue_name, ttl=False, exclusive=False, durable=True)
159+
self.setup_queue(queue_name, ttl=False, exclusive=False, durable=False)
160+
161+
for queue_name in events_queues:
162+
self.setup_queue(queue_name, ttl=True, exclusive=False)
156163

157164
for exchange_type, exchange_name in MOLECULER_EXCHANGES.items():
158165
self.setup_exchange(exchange_name)

moleculer/service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ def event_handler(sender_node_id: str, event: str, payload: dict):
3535
'ver': '2',
3636
'sender': None,
3737
'services': [{
38-
'actions': {'python_test': service_builder('python_test')},
39-
'events': {},
38+
'actions': {'test': service_builder('test')}, # DO NOT name like service_name.action name. Just action name
39+
'events': {'event_test': {'name': 'event_test'}},
4040
'metadata': {},
4141
'name': '$python',
4242
'nodeID': None,

moleculer/topics.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,12 @@ def action_queues(self):
5252
for action in service['actions'].keys():
5353
result.append('MOL.REQB.{service_name}.{action}'.format(service_name=service_name, action=action))
5454
return result
55+
56+
@property
57+
def event_queues(self):
58+
result = []
59+
for service in INFO_PACKET_TEMPLATE['services']:
60+
service_name = service['name']
61+
for event in service['events'].keys():
62+
result.append('MOL.EVENTB.{service_name}.{event}'.format(service_name=service_name, event=event))
63+
return result

0 commit comments

Comments
 (0)