Skip to content

Commit c21ea3d

Browse files
committed
refactoring
1 parent 4a0e40f commit c21ea3d

3 files changed

Lines changed: 354 additions & 339 deletions

File tree

main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from moleculer.client import MoleculerClient, LOG_FORMAT
1+
from moleculer.node import MoleculerNode, LOG_FORMAT
22
import logging
33

44

55
def main():
66
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
77

88
# Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
9-
service = MoleculerClient('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
9+
service = MoleculerNode('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
1010
service.run()
1111

1212

moleculer/client.py

Lines changed: 10 additions & 337 deletions
Original file line numberDiff line numberDiff line change
@@ -1,342 +1,15 @@
1-
# -*- coding: utf-8 -*-
2-
3-
import logging
41
import pika
5-
import json
6-
import psutil
7-
8-
from .topics import MoleculerTopics, EXCHANGES as MOLECULER_EXCHANGES
9-
from pika.channel import Channel
10-
from pika.adapters.select_connection import SelectConnection
11-
from .consumer import MoleculerConsumer
12-
13-
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
14-
'-35s %(lineno) -5d: %(message)s')
15-
LOGGER = logging.getLogger(__name__)
16-
17-
18-
class MoleculerClient(object):
19-
"""
20-
21-
If RabbitMQ closes the connection, it will reopen it. You should
22-
look at the output, as there are limited reasons why the connection may
23-
be closed, which usually are tied to permission related issues or
24-
socket timeouts.
25-
26-
It uses delivery confirmations and illustrates one way to keep track of
27-
messages that have been sent and if they've been confirmed by RabbitMQ.
28-
29-
"""
30-
31-
EXCHANGE_TYPE = 'fanout'
32-
APP_NAME = 'Test'
33-
NODE_ID = 'python-node-2'
34-
HEARTBEAT_INTERVAL = 5
35-
# EVENT_TOPICS = ('MOL.REQB.$node.events', 'MOL.REQB.$node.list')
36-
37-
def __init__(self, amqp_url, app_name=None):
38-
"""Setup the example publisher object, passing in the URL we will use
39-
to connect to RabbitMQ.
40-
41-
:param str amqp_url: The URL for connecting to RabbitMQ
42-
43-
"""
44-
self._connection = None
45-
self._channel: Channel = None
46-
47-
self._deliveries = None
48-
self._acked = None
49-
self._nacked = None
50-
self._message_number = None
51-
52-
self._stopping = False
53-
self._url = amqp_url
54-
self.ready_topics = []
55-
self.expect_topics_count = None
56-
self.ready_bindings = []
57-
self.expect_bindings_count = None
58-
self.moleculer_topics = MoleculerTopics(self.NODE_ID)
59-
self.consumer = MoleculerConsumer(self.NODE_ID)
60-
61-
if app_name is not None:
62-
self.APP_NAME = app_name
63-
64-
def connect(self):
65-
"""This method connects to RabbitMQ, returning the connection handle.
66-
When the connection is established, the on_connection_open method
67-
will be invoked by pika. If you want the reconnection to work, make
68-
sure you set stop_ioloop_on_close to False, which is not the default
69-
behavior of this adapter.
70-
71-
:rtype: pika.SelectConnection
72-
73-
"""
74-
LOGGER.info('Connecting to %s', self._url)
75-
return pika.SelectConnection(pika.URLParameters(self._url),
76-
on_open_callback=self.on_connection_open,
77-
on_close_callback=self.on_connection_closed,
78-
stop_ioloop_on_close=False)
79-
80-
def on_connection_open(self, unused_connection):
81-
LOGGER.info('Connection opened')
82-
self.open_channel()
83-
84-
def on_connection_closed(self, connection, reply_code, reply_text):
85-
"""This method is invoked by pika when the connection to RabbitMQ is
86-
closed unexpectedly. Since it is unexpected, we will reconnect to
87-
RabbitMQ if it disconnects.
88-
89-
:param pika.connection.Connection connection: The closed connection obj
90-
:param int reply_code: The server provided reply_code if given
91-
:param str reply_text: The server provided reply_text if given
92-
93-
"""
94-
self._channel = None
95-
if self._stopping:
96-
self._connection.ioloop.stop()
97-
else:
98-
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
99-
reply_code, reply_text)
100-
self._connection.add_timeout(5, self._connection.ioloop.stop)
101-
102-
def open_channel(self):
103-
LOGGER.info('Creating a new channel')
104-
self._connection.channel(on_open_callback=self.on_channel_open)
105-
106-
def on_channel_open(self, channel):
107-
LOGGER.info('Channel opened')
108-
self._channel: Channel = channel
109-
self._channel.basic_qos(prefetch_count=1)
110-
# self._channel.confirm_delivery() # Enabled delivery confirmations
111-
self.add_on_channel_close_callback()
112-
self.create_topics()
113-
self._connection.add_timeout(0.2, self.subscribe_to_topics)
114-
self._connection.add_timeout(1, self.start_heartbeating)
115-
116-
def start_heartbeating(self):
117-
heartbeat_packet = {
118-
'ver': '2',
119-
'sender': self.NODE_ID,
120-
'cpu': psutil.cpu_percent(interval=None)
121-
}
122-
self._channel.basic_publish(MOLECULER_EXCHANGES['HEARTBEAT'], '',
123-
json.dumps(heartbeat_packet))
124-
self._connection.add_timeout(self.HEARTBEAT_INTERVAL, self.start_heartbeating)
125-
126-
def subscribe_to_topics(self):
127-
if self.is_bindings_ready:
128-
self.add_on_cancel_callback()
129-
# for queue in self.moleculer_topics.queues.values():
130-
self._channel.basic_consume(self.consumer.discover, self.moleculer_topics.queues['DISCOVER'])
131-
self._channel.basic_consume(self.consumer.info, self.moleculer_topics.queues['INFO'])
132-
self._channel.basic_consume(self.consumer.ping, self.moleculer_topics.queues['PING'])
133-
self._channel.basic_consume(self.consumer.request, self.moleculer_topics.queues['REQUEST'])
134-
self._channel.basic_consume(self.consumer.response, self.moleculer_topics.queues['RESPONSE'])
135-
self._connection.add_timeout(0.5, self.discover_packet)
136-
else:
137-
self._connection.add_timeout(0.1, self.subscribe_to_topics)
138-
139-
def create_topics(self):
140-
queues = self.moleculer_topics.queues.items()
141-
self.expect_topics_count = len(queues) + len(MOLECULER_EXCHANGES)
142-
143-
for queue_type, queue_name in queues:
144-
if queue_type in ('REQUEST', 'RESPONSE'):
145-
self.setup_queue(queue_name, ttl=False, auto_delete=False)
146-
elif queue_type == 'HEARTBEAT':
147-
self.setup_queue(queue_name, ttl=True, auto_delete=True)
148-
else:
149-
self.setup_queue(queue_name, ttl=True, auto_delete=False)
150-
for exchange_type, exchange_name in MOLECULER_EXCHANGES.items():
151-
self.setup_exchange(exchange_name)
152-
153-
self._connection.add_timeout(0.1, self.check_topics_status)
154-
155-
def check_topics_status(self):
156-
if len(self.ready_topics) == self.expect_topics_count:
157-
LOGGER.info('All topics successfully declared')
158-
self.bind_queues_to_exchanges()
159-
else:
160-
self._connection.add_timeout(0.1, self.check_topics_status)
161-
162-
@property
163-
def is_bindings_ready(self):
164-
if len(self.ready_bindings) == self.expect_bindings_count:
165-
LOGGER.info('All bindings successfully declared.')
166-
return True
167-
else:
168-
return False
169-
170-
def bind_queues_to_exchanges(self):
171-
self.expect_bindings_count = len(self.moleculer_topics.bindings)
172-
for queue_name, fanout_name in self.moleculer_topics.bindings.items():
173-
self._channel.queue_bind(self.on_bindok, queue_name, fanout_name)
174-
175-
def add_on_channel_close_callback(self):
176-
"""This method tells pika to call the on_channel_closed method if
177-
RabbitMQ unexpectedly closes the channel.
178-
179-
"""
180-
LOGGER.info('Adding channel close callback')
181-
self._channel.add_on_close_callback(self.on_channel_closed)
182-
183-
def discover_packet(self):
184-
req = {
185-
'ver': '2',
186-
'sender': self.NODE_ID
187-
}
188-
self._channel.basic_publish(MOLECULER_EXCHANGES['DISCOVER'], '', json.dumps(req))
189-
190-
def on_channel_closed(self, channel, reply_code, reply_text):
191-
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
192-
Channels are usually closed if you attempt to do something that
193-
violates the protocol, such as re-declare an exchange or queue with
194-
different parameters. In this case, we'll close the connection
195-
to shutdown the object.
196-
197-
:param pika.channel.Channel channel: The closed channel
198-
:param int reply_code: The numeric reason the channel was closed
199-
:param str reply_text: The text reason the channel was closed
200-
201-
"""
202-
LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
203-
self._channel = None
204-
if not self._stopping:
205-
self._connection.close()
206-
207-
def setup_exchange(self, exchange_name):
208-
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
209-
command. When it is complete, the on_exchange_declareok method will
210-
be invoked by pika.
211-
212-
:param str|unicode exchange_name: The name of the exchange to declare
213-
214-
"""
215-
LOGGER.info('Declaring exchange %s', exchange_name)
216-
self._channel.exchange_declare(self.on_exchange_declareok,
217-
exchange_name,
218-
self.EXCHANGE_TYPE, durable=True)
219-
220-
def on_exchange_declareok(self, unused_frame):
221-
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
222-
command.
223-
224-
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
225-
226-
"""
227-
LOGGER.info('Exchange declared')
228-
self.ready_topics.append(None)
229-
230-
def setup_queue(self, queue_name, ttl=True, auto_delete=False):
231-
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
232-
command. When it is complete, the on_queue_declareok method will
233-
be invoked by pika.
234-
235-
:param auto_delete:
236-
:param ttl:
237-
:param str|unicode queue_name: The name of the queue to declare.
238-
239-
"""
240-
LOGGER.info('Declaring queue %s', queue_name)
241-
arguments = {}
242-
if ttl:
243-
arguments['x-message-ttl'] = 5000 # eventTimeToLive: https://github.com/ice-services/moleculer/pull/72
244-
self._channel.queue_declare(self.on_queue_declareok, queue_name, auto_delete=auto_delete, arguments=arguments)
245-
246-
def on_queue_declareok(self, method_frame):
247-
"""Method invoked by pika when the Queue.Declare RPC call made in
248-
setup_queue has completed. In this method we will bind the queue
249-
and exchange together with the routing key by issuing the Queue.Bind
250-
RPC command. When this command is complete, the on_bindok method will
251-
be invoked by pika.
252-
253-
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
254-
255-
"""
256-
LOGGER.info('Queue for moleculer declared')
257-
self.ready_topics.append(None)
258-
259-
def on_bindok(self, unused_frame):
260-
"""This method is invoked by pika when it receives the Queue.BindOk
261-
response from RabbitMQ. Since we know we're now setup and bound, it's
262-
time to start publishing."""
263-
LOGGER.info('Queue bound to exchange.')
264-
self.ready_bindings.append(None)
265-
266-
def add_on_cancel_callback(self):
267-
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
268-
for some reason. If RabbitMQ does cancel the consumer,
269-
on_consumer_cancelled will be invoked by pika.
270-
271-
"""
272-
LOGGER.info('Adding consumer cancellation callback')
273-
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
274-
275-
def on_consumer_cancelled(self, method_frame):
276-
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
277-
receiving messages.
278-
279-
:param pika.frame.Method method_frame: The Basic.Cancel frame
280-
281-
"""
282-
LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
283-
method_frame)
284-
if self._channel:
285-
self._channel.close()
286-
287-
def run(self):
288-
"""Run the service code by connecting and then starting the IOLoop.
289-
290-
"""
291-
while not self._stopping:
292-
self._connection = None
293-
self._deliveries = []
294-
self._acked = 0
295-
self._nacked = 0
296-
self._message_number = 0
297-
298-
try:
299-
self._connection: SelectConnection = self.connect()
300-
self._connection.ioloop.start()
301-
except KeyboardInterrupt:
302-
self.stop()
303-
if (self._connection is not None and
304-
not self._connection.is_closed):
305-
self._connection.ioloop.start()
306-
307-
LOGGER.info('Stopped')
308-
309-
def stop(self):
310-
"""Stop the example by closing the channel and connection. We
311-
set a flag here so that we stop scheduling new messages to be
312-
published. The IOLoop is started because this method is
313-
invoked by the Try/Catch below when KeyboardInterrupt is caught.
314-
Starting the IOLoop again will allow the publisher to cleanly
315-
disconnect from RabbitMQ.
3162

317-
"""
318-
LOGGER.info('Stopping')
319-
disconnect_packet = {
320-
'ver': '2',
321-
'sender': self.NODE_ID
322-
}
323-
self._channel.basic_publish(MOLECULER_EXCHANGES['DISCONNECT'], '',
324-
json.dumps(disconnect_packet))
325-
self._stopping = True
326-
self.close_channel()
327-
self.close_connection()
3283

329-
def close_channel(self):
330-
"""Invoke this command to close the channel with RabbitMQ by sending
331-
the Channel.Close RPC command.
4+
class MoleculerClient:
3325

333-
"""
334-
if self._channel is not None:
335-
LOGGER.info('Closing the channel')
336-
self._channel.close()
6+
def __init__(self, username='guest', password='guest', host='localhost'):
7+
url = 'amqp://{username}:{password}@{host}:5672/%2F?connection_attempts=3&heartbeat_interval=3600'.format(
8+
username=username, password=password, host=host
9+
)
10+
connection = pika.BlockingConnection(pika.URLParameters(url))
11+
self.channel = connection.channel()
33712

338-
def close_connection(self):
339-
"""This method closes the connection to RabbitMQ."""
340-
if self._connection is not None:
341-
LOGGER.info('Closing connection')
342-
self._connection.close()
13+
def publish_event(self, node_id=None, balanced=False, event_name=None):
14+
if balanced:
15+
pass

0 commit comments

Comments
 (0)