Skip to content

Commit 6fc11c3

Browse files
committed
added support of namespaces
1 parent 6271df9 commit 6fc11c3

3 files changed

Lines changed: 66 additions & 22 deletions

File tree

moleculer/consumer.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,27 @@
11
import json
2-
from .topics import EXCHANGES as MOLECULER_EXCHANGES
32
import datetime
43
from .service import request_handler, INFO_PACKET_TEMPLATE, event_handler
54

65

76
class MoleculerConsumer:
87

9-
def __init__(self, node_id):
8+
def __init__(self, node_id, moleculer_topics, namespace=None):
9+
self.namespace = namespace
10+
self.moleculer_topics = moleculer_topics
1011
self.node_id = node_id
1112
self.is_node_discovered = False
13+
if self.namespace is None:
14+
self.info_template = 'MOL.INFO.{node_id}'
15+
else:
16+
self.info_template = 'MOL-{namespace}.INFO.{node_id}'
17+
if self.namespace is None:
18+
self.res_template = 'MOL.RES.{node_id}'
19+
else:
20+
self.res_template = 'MOL-{namespace}.RES.{node_id}'
21+
if self.namespace is None:
22+
self.pong_template = 'MOL.PONG.{node_id}'
23+
else:
24+
self.pong_template = 'MOL-{namespace}.PONG.{node_id}'
1225

1326
def build_info_package(self):
1427
info_packet = INFO_PACKET_TEMPLATE
@@ -19,7 +32,7 @@ def build_info_package(self):
1932
def discover(self, channel, basic_deliver, properties, body):
2033
discover_packet = json.loads(body)
2134
sender = discover_packet['sender']
22-
sender_queue = 'MOL.INFO.{node_id}'.format(node_id=sender)
35+
sender_queue = self.info_template.format(node_id=sender, namespace=self.namespace)
2336
info_packet = self.build_info_package() # TODO: reuse same package
2437
channel.basic_publish('', sender_queue, json.dumps(info_packet))
2538

@@ -31,7 +44,7 @@ def info(self, channel, basic_deliver, properties, body):
3144
info_packet = INFO_PACKET_TEMPLATE
3245
info_packet['sender'] = self.node_id
3346
info_packet['services'][0]['nodeID'] = self.node_id
34-
channel.basic_publish(MOLECULER_EXCHANGES['INFO'], '', json.dumps(info_packet))
47+
channel.basic_publish(self.moleculer_topics.exchanges['INFO'], '', json.dumps(info_packet))
3548
self.is_node_discovered = True
3649
else:
3750
pass # TODO: save discovered services
@@ -43,7 +56,7 @@ def ping(self, channel, basic_deliver, properties, body):
4356
ping_packet = json.loads(body)
4457
sender_node_id, time = ping_packet['sender'], ping_packet['time']
4558
if sender_node_id != self.node_id:
46-
sender_exchange = 'MOL.PONG.{node_id}'.format(node_id=sender_node_id)
59+
sender_exchange = self.pong_template.format(node_id=sender_node_id, namespace=self.namespace)
4760
pong_packet = {
4861
'ver': '2',
4962
'sender': self.node_id,
@@ -69,7 +82,7 @@ def request(self, channel, basic_deliver, properties, body):
6982
'success': True,
7083
'data': {'result': 'Response from python node: ' + self.node_id}
7184
}
72-
sender_exchange = 'MOL.RES.{node_id}'.format(node_id=sender)
85+
sender_exchange = self.res_template.format(node_id=sender, namespace=self.namespace)
7386
channel.basic_publish('', sender_exchange, json.dumps(response_packet))
7487

7588
def disconnect(self, channel, basic_deliver, properties, body):
@@ -82,7 +95,7 @@ def response(self, channel, basic_deliver, properties, body):
8295
# TODO: handle responses from other services
8396

8497
def event(self, channel, basic_deliver, properties, body):
85-
print('EVENT!!!!')
98+
# print('EVENT!!!!')
8699
event_packet = json.loads(body)
87100
sender, event, data = event_packet['sender'], event_packet['event'], event_packet['data']
88101
event_handler(sender, event, data)

moleculer/node.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import json
66
import psutil
77

8-
from .topics import MoleculerTopics, EXCHANGES as MOLECULER_EXCHANGES
8+
from .topics import MoleculerTopics
99
from pika.channel import Channel
1010
from pika.adapters.select_connection import SelectConnection
1111
from .consumer import MoleculerConsumer
@@ -29,7 +29,7 @@ class MoleculerNode(object):
2929
NODE_ID = 'python-node-1'
3030
HEARTBEAT_INTERVAL = 5
3131

32-
def __init__(self, amqp_url, node_id=None):
32+
def __init__(self, amqp_url, node_id=None, namespace=None):
3333
"""Setup the example publisher object, passing in the URL we will use
3434
to connect to RabbitMQ.
3535
@@ -54,8 +54,11 @@ def __init__(self, amqp_url, node_id=None):
5454
if node_id is not None:
5555
self.NODE_ID = node_id
5656

57-
self.moleculer_topics = MoleculerTopics(self.NODE_ID)
58-
self.consumer = MoleculerConsumer(self.NODE_ID)
57+
self.namespace = namespace
58+
59+
self.moleculer_topics = MoleculerTopics(self.NODE_ID, namespace=self.namespace)
60+
self.consumer = MoleculerConsumer(self.NODE_ID, moleculer_topics=self.moleculer_topics,
61+
namespace=self.namespace)
5962

6063
def connect(self):
6164
"""This method connects to RabbitMQ, returning the connection handle.
@@ -115,7 +118,7 @@ def start_heartbeating(self):
115118
'sender': self.NODE_ID,
116119
'cpu': psutil.cpu_percent(interval=None)
117120
}
118-
self.channel.basic_publish(MOLECULER_EXCHANGES['HEARTBEAT'], '',
121+
self.channel.basic_publish(self.moleculer_topics.exchanges['HEARTBEAT'], '',
119122
json.dumps(heartbeat_packet))
120123
self._connection.add_timeout(self.HEARTBEAT_INTERVAL, self.start_heartbeating)
121124

@@ -142,7 +145,8 @@ def subscribe_to_topics(self):
142145
def create_topics(self):
143146
queues = self.moleculer_topics.queues.items()
144147
action_queues, events_queues = self.moleculer_topics.action_queues, self.moleculer_topics.event_queues
145-
self.expect_topics_count = len(queues) + len(MOLECULER_EXCHANGES) + len(action_queues) + len(events_queues)
148+
self.expect_topics_count = len(queues) + len(self.moleculer_topics.exchanges) + len(action_queues) + len(
149+
events_queues)
146150

147151
for queue_type, queue_name in queues:
148152
if queue_type in ('REQUEST', 'RESPONSE'):
@@ -158,7 +162,7 @@ def create_topics(self):
158162
for queue_name in events_queues:
159163
self.setup_queue(queue_name, ttl=True, exclusive=False)
160164

161-
for exchange_type, exchange_name in MOLECULER_EXCHANGES.items():
165+
for exchange_type, exchange_name in self.moleculer_topics.exchanges.items():
162166
self.setup_exchange(exchange_name)
163167

164168
self._connection.add_timeout(0.1, self.check_topics_status)
@@ -196,7 +200,7 @@ def discover_packet(self):
196200
'ver': '2',
197201
'sender': self.NODE_ID
198202
}
199-
self.channel.basic_publish(MOLECULER_EXCHANGES['DISCOVER'], '', json.dumps(req))
203+
self.channel.basic_publish(self.moleculer_topics.exchanges['DISCOVER'], '', json.dumps(req))
200204

201205
def on_channel_closed(self, channel, reply_code, reply_text):
202206
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
@@ -332,7 +336,7 @@ def stop(self):
332336
'ver': '2',
333337
'sender': self.NODE_ID
334338
}
335-
self.channel.basic_publish(MOLECULER_EXCHANGES['DISCONNECT'], '',
339+
self.channel.basic_publish(self.moleculer_topics.exchanges['DISCONNECT'], '',
336340
json.dumps(disconnect_packet))
337341
self._stopping = True
338342
self.close_channel()

moleculer/topics.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
'DISCONNECT': 'MOL.DISCONNECT'
1010
}
1111

12+
REQB = 'MOL.REQB.{service_name}.{action}'
13+
REQB_NAMESPACE = 'MOL-{namespace}.REQB.{service_name}.{action}'
14+
EVENTB = 'MOL.EVENTB.{service_name}.{event}'
15+
EVENTB_NAMESPACE = 'MOL-{namespace}.EVENTB.{service_name}.{event}'
16+
1217

1318
class MoleculerTopics:
1419
EVENT_QUEUE = 'MOL.EVENT.{node_id}'
@@ -32,32 +37,54 @@ def queues(self):
3237
result[attr.replace('_QUEUE', '')] = getattr(self, attr)
3338
return result
3439

35-
def __init__(self, node_id):
40+
def __init__(self, node_id, namespace=None):
3641
for queue_name in self.queue_attrs:
37-
setattr(self, queue_name, getattr(MoleculerTopics, queue_name).format(node_id=node_id))
42+
if namespace is None:
43+
setattr(self, queue_name, getattr(MoleculerTopics, queue_name).format(node_id=node_id))
44+
else:
45+
queue_string = getattr(MoleculerTopics, queue_name).format(node_id=node_id)
46+
queue_string = queue_string.repalce('MOL', 'MOL-' + namespace)
47+
setattr(self, queue_name, queue_string)
48+
self.namespace = namespace
3849

3950
@property
4051
def bindings(self):
4152
result = {}
4253
for queue_type, queue_name in self.queues.items():
43-
if queue_type in EXCHANGES:
44-
result[queue_name] = EXCHANGES[queue_type]
54+
exchanges = self.exchanges
55+
if queue_type in exchanges:
56+
result[queue_name] = exchanges[queue_type]
4557
return result
4658

4759
@property
4860
def action_queues(self):
61+
if self.namespace is None:
62+
template = REQB
63+
else:
64+
template = REQB_NAMESPACE
4965
result = []
5066
for service in INFO_PACKET_TEMPLATE['services']:
5167
service_name = service['name']
5268
for action in service['actions'].keys():
53-
result.append('MOL.REQB.{service_name}.{action}'.format(service_name=service_name, action=action))
69+
result.append(template.format(service_name=service_name, action=action, namespace=self.namespace))
5470
return result
5571

5672
@property
5773
def event_queues(self):
74+
if self.namespace is None:
75+
template = EVENTB
76+
else:
77+
template = EVENTB_NAMESPACE
5878
result = []
5979
for service in INFO_PACKET_TEMPLATE['services']:
6080
service_name = service['name']
6181
for event in service['events'].keys():
62-
result.append('MOL.EVENTB.{service_name}.{event}'.format(service_name=service_name, event=event))
82+
result.append(template.format(service_name=service_name, event=event, namespace=self.namespace))
6383
return result
84+
85+
@property
86+
def exchanges(self):
87+
if self.namespace is None:
88+
return EXCHANGES
89+
else:
90+
return {x: x.replace('MOL', 'MOL-' + self.namespace) for x in EXCHANGES}

0 commit comments

Comments
 (0)