Skip to content

Commit 9f8d35e

Browse files
committed
added python client for moleculer
1 parent 76538a3 commit 9f8d35e

5 files changed

Lines changed: 165 additions & 43 deletions

File tree

client_example.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from moleculer.client import MoleculerClient
2+
import threading
3+
import time
4+
5+
6+
def main():
7+
client = MoleculerClient('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
8+
t1 = threading.Thread(target=client.run) # run loop in separate thread
9+
t1.start()
10+
11+
# wait for initialization
12+
time.sleep(5) # TODO: extract 'ready' event from thread
13+
print(client.emit('event_test'))
14+
15+
time.sleep(3)
16+
print(client.broadcast('event_test'))
17+
18+
19+
if __name__ == '__main__':
20+
main()

moleculer/client.py

Lines changed: 101 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,105 @@
1-
import pika
1+
from .node import MoleculerNode, LOGGER
2+
from pika.channel import Channel
3+
import json
24

35

4-
class MoleculerClient:
6+
class MoleculerClient(MoleculerNode):
57

6-
def __init__(self, username='guest', password='guest', host='localhost'):
7-
url = 'amqp://{username}:{password}@{host}:5672/%2F?connection_attempts=3&heartbeat_interval=3600'.format(
8-
username=username, password=password, host=host
9-
)
10-
connection = pika.BlockingConnection(pika.URLParameters(url))
11-
self.channel = connection.channel()
8+
def __init__(self, amqp_url):
9+
super().__init__(amqp_url, node_id='PYTHON-CLIENT')
10+
self.network = NetworkInfo()
1211

13-
def publish_event(self, node_id=None, balanced=False, event_name=None):
14-
if balanced:
15-
pass
12+
def on_channel_open(self, channel):
13+
LOGGER.info('Channel opened')
14+
self.channel: Channel = channel
15+
self.add_on_channel_close_callback()
16+
info_queue = 'MOL.INFO.{node_id}'.format(node_id=self.NODE_ID)
17+
self.setup_queue(info_queue)
18+
self.channel.queue_bind(self.on_bindok, info_queue, 'MOL.INFO')
19+
self.channel.basic_consume(self.process_info_packages, info_queue)
20+
self.discover_packet()
21+
22+
def process_info_packages(self, unused_channel, basic_deliver, properties, body):
23+
info_packet = json.loads(body)
24+
self.network.add_node(info_packet)
25+
26+
def emit(self, event_name, data=None):
27+
candidates = self.get_emit_candidates(event_name)
28+
if len(candidates) == 0:
29+
return {'error': 'This event not registered.'}
30+
else:
31+
if data is None:
32+
data = {}
33+
event_package = MoleculerClient.build_event('PYTHON-CLIENT', event_name, data)
34+
for service_name in candidates:
35+
queue_name = 'MOL.EVENTB.{service}.{event}'.format(service=service_name, event=event_name)
36+
self.channel.basic_publish('', queue_name, event_package)
37+
38+
def broadcast(self, event_name, data=None):
39+
candidates = self.get_broadcast_candidates(event_name)
40+
if len(candidates) == 0:
41+
return {'error': 'This event not registered.'}
42+
else:
43+
if data is None:
44+
data = {}
45+
event_package = MoleculerClient.build_event('PYTHON-CLIENT', event_name, data)
46+
for node_id in candidates:
47+
queue_name = 'MOL.EVENT.{node_id}'.format(node_id=node_id)
48+
self.channel.basic_publish('', queue_name, event_package)
49+
50+
def get_emit_candidates(self, event_name):
51+
service_names = set()
52+
for node_id, node_info in self.network.NODES.items():
53+
if event_name in node_info['events']:
54+
service_name = node_info['service_name']
55+
service_names.add(service_name)
56+
return service_names
57+
58+
def get_broadcast_candidates(self, event_name):
59+
candidates = []
60+
for node_id, node_info in self.network.NODES.items():
61+
if event_name in node_info['events']:
62+
candidates.append(node_id)
63+
return candidates
64+
65+
@staticmethod
66+
def build_event(sender_node_id, event_name, payload):
67+
event = {
68+
'ver': '2',
69+
'sender': sender_node_id,
70+
'event': event_name,
71+
'data': payload,
72+
'groups': []
73+
}
74+
return json.dumps(event)
75+
76+
77+
class NetworkInfo:
78+
NODES = {}
79+
80+
def add_node(self, info_packet: dict):
81+
node_id = info_packet['sender']
82+
if node_id not in self.NODES.keys():
83+
self.NODES[node_id] = {
84+
'actions': {},
85+
'events': []
86+
}
87+
node = self.NODES[node_id]
88+
for service in info_packet['services']:
89+
service_name = service['name']
90+
is_service_node = bool(service_name == '$node')
91+
for action_name, action_spec in service['actions'].items():
92+
if is_service_node:
93+
queue_name = 'MOL.REQB.{action}'.format(action=action_name)
94+
else:
95+
queue_name = 'MOL.REQB.{service_name}.{action}'.format(service_name=service_name,
96+
action=action_name)
97+
node['actions'][action_name] = queue_name
98+
99+
for event_name in service['events'].keys():
100+
node['events'].append(event_name)
101+
else:
102+
node['service_name'] = service_name
103+
104+
def disconnect_node(self, node_id):
105+
del self.NODES[node_id]

moleculer/consumer.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,18 @@ def __init__(self, node_id):
1010
self.node_id = node_id
1111
self.is_node_discovered = False
1212

13-
def discover(self, unused_channel, basic_deliver, properties, body):
14-
pass
13+
def build_info_package(self):
14+
info_packet = INFO_PACKET_TEMPLATE
15+
info_packet['sender'] = self.node_id
16+
info_packet['services'][0]['nodeID'] = self.node_id
17+
return info_packet
18+
19+
def discover(self, channel, basic_deliver, properties, body):
20+
discover_packet = json.loads(body)
21+
sender = discover_packet['sender']
22+
sender_queue = 'MOL.INFO.{node_id}'.format(node_id=sender)
23+
info_packet = self.build_info_package() # TODO: reuse same package
24+
channel.basic_publish('', sender_queue, json.dumps(info_packet))
1525

1626
def info(self, channel, basic_deliver, properties, body):
1727
if not self.is_node_discovered:
@@ -67,10 +77,12 @@ def disconnect(self, channel, basic_deliver, properties, body):
6777
pass
6878

6979
def response(self, channel, basic_deliver, properties, body):
70-
channel.basic_ack(basic_deliver.delivery_tag)
80+
# channel.basic_ack(basic_deliver.delivery_tag)
81+
pass
7182
# TODO: handle responses from other services
7283

7384
def event(self, channel, basic_deliver, properties, body):
85+
print('EVENT!!!!')
7486
event_packet = json.loads(body)
7587
sender, event, data = event_packet['sender'], event_packet['event'], event_packet['data']
7688
event_handler(sender, event, data)

moleculer/node.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def __init__(self, amqp_url, node_id=None):
3737
3838
"""
3939
self._connection = None
40-
self._channel: Channel = None
40+
self.channel: Channel = None
4141

4242
self._deliveries = None
4343
self._acked = None
@@ -87,7 +87,7 @@ def on_connection_closed(self, connection, reply_code, reply_text):
8787
:param str reply_text: The server provided reply_text if given
8888
8989
"""
90-
self._channel = None
90+
self.channel = None
9191
if self._stopping:
9292
self._connection.ioloop.stop()
9393
else:
@@ -101,7 +101,7 @@ def open_channel(self):
101101

102102
def on_channel_open(self, channel):
103103
LOGGER.info('Channel opened')
104-
self._channel: Channel = channel
104+
self.channel: Channel = channel
105105
# self._channel.basic_qos(prefetch_count=1) # TODO: figure out why prefetch must be disabled to make it work
106106
# self._channel.confirm_delivery() # Enabled delivery confirmations
107107
self.add_on_channel_close_callback()
@@ -115,25 +115,25 @@ def start_heartbeating(self):
115115
'sender': self.NODE_ID,
116116
'cpu': psutil.cpu_percent(interval=None)
117117
}
118-
self._channel.basic_publish(MOLECULER_EXCHANGES['HEARTBEAT'], '',
119-
json.dumps(heartbeat_packet))
118+
self.channel.basic_publish(MOLECULER_EXCHANGES['HEARTBEAT'], '',
119+
json.dumps(heartbeat_packet))
120120
self._connection.add_timeout(self.HEARTBEAT_INTERVAL, self.start_heartbeating)
121121

122122
def subscribe_to_topics(self):
123123
if self.is_bindings_ready:
124124
self.add_on_cancel_callback()
125125
# for queue in self.moleculer_topics.queues.values():
126-
self._channel.basic_consume(self.consumer.discover, self.moleculer_topics.queues['DISCOVER'])
127-
self._channel.basic_consume(self.consumer.info, self.moleculer_topics.queues['INFO'])
128-
self._channel.basic_consume(self.consumer.ping, self.moleculer_topics.queues['PING'])
129-
self._channel.basic_consume(self.consumer.request, self.moleculer_topics.queues['REQUEST'])
130-
self._channel.basic_consume(self.consumer.response, self.moleculer_topics.queues['RESPONSE'])
131-
self._channel.basic_consume(self.consumer.event, self.moleculer_topics.queues['EVENT'])
126+
self.channel.basic_consume(self.consumer.discover, self.moleculer_topics.queues['DISCOVER'])
127+
self.channel.basic_consume(self.consumer.info, self.moleculer_topics.queues['INFO'])
128+
self.channel.basic_consume(self.consumer.ping, self.moleculer_topics.queues['PING'])
129+
self.channel.basic_consume(self.consumer.request, self.moleculer_topics.queues['REQUEST'])
130+
self.channel.basic_consume(self.consumer.response, self.moleculer_topics.queues['RESPONSE'])
131+
self.channel.basic_consume(self.consumer.event, self.moleculer_topics.queues['EVENT'], no_ack=True)
132132

133133
for queue_name in self.moleculer_topics.action_queues:
134-
self._channel.basic_consume(self.consumer.request, queue_name)
134+
self.channel.basic_consume(self.consumer.request, queue_name)
135135
for queue_name in self.moleculer_topics.event_queues:
136-
self._channel.basic_consume(self.consumer.event, queue_name, no_ack=True)
136+
self.channel.basic_consume(self.consumer.event, queue_name, no_ack=True)
137137

138138
self._connection.add_timeout(0.5, self.discover_packet)
139139
else:
@@ -181,22 +181,22 @@ def is_bindings_ready(self):
181181
def bind_queues_to_exchanges(self):
182182
self.expect_bindings_count = len(self.moleculer_topics.bindings)
183183
for queue_name, fanout_name in self.moleculer_topics.bindings.items():
184-
self._channel.queue_bind(self.on_bindok, queue_name, fanout_name)
184+
self.channel.queue_bind(self.on_bindok, queue_name, fanout_name)
185185

186186
def add_on_channel_close_callback(self):
187187
"""This method tells pika to call the on_channel_closed method if
188188
RabbitMQ unexpectedly closes the channel.
189189
190190
"""
191191
LOGGER.info('Adding channel close callback')
192-
self._channel.add_on_close_callback(self.on_channel_closed)
192+
self.channel.add_on_close_callback(self.on_channel_closed)
193193

194194
def discover_packet(self):
195195
req = {
196196
'ver': '2',
197197
'sender': self.NODE_ID
198198
}
199-
self._channel.basic_publish(MOLECULER_EXCHANGES['DISCOVER'], '', json.dumps(req))
199+
self.channel.basic_publish(MOLECULER_EXCHANGES['DISCOVER'], '', json.dumps(req))
200200

201201
def on_channel_closed(self, channel, reply_code, reply_text):
202202
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
@@ -211,7 +211,7 @@ def on_channel_closed(self, channel, reply_code, reply_text):
211211
212212
"""
213213
LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
214-
self._channel = None
214+
self.channel = None
215215
if not self._stopping:
216216
self._connection.close()
217217

@@ -224,9 +224,9 @@ def setup_exchange(self, exchange_name):
224224
225225
"""
226226
LOGGER.info('Declaring exchange %s', exchange_name)
227-
self._channel.exchange_declare(self.on_exchange_declareok,
228-
exchange_name,
229-
self.EXCHANGE_TYPE, durable=True)
227+
self.channel.exchange_declare(self.on_exchange_declareok,
228+
exchange_name,
229+
self.EXCHANGE_TYPE, durable=True)
230230

231231
def on_exchange_declareok(self, unused_frame):
232232
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
@@ -252,8 +252,8 @@ def setup_queue(self, queue_name, ttl=True, exclusive=False, durable=False):
252252
arguments = {}
253253
if ttl:
254254
arguments['x-message-ttl'] = 5000 # eventTimeToLive: https://github.com/ice-services/moleculer/pull/72
255-
self._channel.queue_declare(self.on_queue_declareok, queue_name,
256-
exclusive=exclusive, durable=durable, arguments=arguments)
255+
self.channel.queue_declare(self.on_queue_declareok, queue_name,
256+
exclusive=exclusive, durable=durable, arguments=arguments)
257257

258258
def on_queue_declareok(self, method_frame):
259259
"""Method invoked by pika when the Queue.Declare RPC call made in
@@ -282,7 +282,7 @@ def add_on_cancel_callback(self):
282282
283283
"""
284284
LOGGER.info('Adding consumer cancellation callback')
285-
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
285+
self.channel.add_on_cancel_callback(self.on_consumer_cancelled)
286286

287287
def on_consumer_cancelled(self, method_frame):
288288
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
@@ -293,8 +293,8 @@ def on_consumer_cancelled(self, method_frame):
293293
"""
294294
LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
295295
method_frame)
296-
if self._channel:
297-
self._channel.close()
296+
if self.channel:
297+
self.channel.close()
298298

299299
def run(self):
300300
"""Run the service code by connecting and then starting the IOLoop.
@@ -332,8 +332,8 @@ def stop(self):
332332
'ver': '2',
333333
'sender': self.NODE_ID
334334
}
335-
self._channel.basic_publish(MOLECULER_EXCHANGES['DISCONNECT'], '',
336-
json.dumps(disconnect_packet))
335+
self.channel.basic_publish(MOLECULER_EXCHANGES['DISCONNECT'], '',
336+
json.dumps(disconnect_packet))
337337
self._stopping = True
338338
self.close_channel()
339339
self.close_connection()
@@ -343,9 +343,9 @@ def close_channel(self):
343343
the Channel.Close RPC command.
344344
345345
"""
346-
if self._channel is not None:
346+
if self.channel is not None:
347347
LOGGER.info('Closing the channel')
348-
self._channel.close()
348+
self.channel.close()
349349

350350
def close_connection(self):
351351
"""This method closes the connection to RabbitMQ."""
File renamed without changes.

0 commit comments

Comments
 (0)