55
66class MoleculerClient (MoleculerNode ):
77
8- def __init__ (self , amqp_url ):
9- super ().__init__ (amqp_url , node_id = 'PYTHON-CLIENT' )
8+ def __init__ (self , amqp_url , namespace = None ):
9+ self .namespace = namespace
10+ super ().__init__ (amqp_url , node_id = 'PYTHON-CLIENT' , namespace = self .namespace )
1011 self .network = NetworkInfo ()
12+ if self .namespace is None :
13+ self .info_template = 'MOL.INFO.{node_id}'
14+ self .disconnect_template = 'MOL.DISCONNECT.{node_id}'
15+ self .eventb_template = 'MOL.EVENTB.{service}.{event}'
16+ self .event_template = 'MOL.EVENT.{node_id}'
17+ else :
18+ self .info_template = 'MOL-{namespace}.INFO.{node_id}'
19+ self .disconnect_template = 'MOL-{namespace}.DISCONNECT.{node_id}'
20+ self .eventb_template = 'MOL-{namespace}.EVENTB.{service}.{event}'
21+ self .event_template = 'MOL-{namespace}.EVENT.{node_id}'
1122
1223 def on_channel_open (self , channel ):
1324 LOGGER .info ('Channel opened' )
1425 self .channel : Channel = channel
1526 self .add_on_channel_close_callback ()
16- info_queue = 'MOL.INFO.{node_id}' . format (node_id = self .NODE_ID )
17- disconnect_queue = 'MOL.DISCONNECT.{node_id}' . format (node_id = self .NODE_ID )
27+ info_queue = self . info_template . format (node_id = self .NODE_ID , namespace = self . namespace )
28+ disconnect_queue = self . disconnect_template . format (node_id = self .NODE_ID , namespace = self . namespace )
1829 self .setup_queue (info_queue )
1930 self .setup_queue (disconnect_queue )
20- self .channel .queue_bind (self .on_bindok , info_queue , 'MOL. INFO' )
21- self .channel .queue_bind (self .on_bindok , disconnect_queue , 'MOL. DISCONNECT' )
31+ self .channel .queue_bind (self .on_bindok , info_queue , self . moleculer_topics . exchanges [ ' INFO'] )
32+ self .channel .queue_bind (self .on_bindok , disconnect_queue , self . moleculer_topics . exchanges [ ' DISCONNECT'] )
2233 self .channel .basic_consume (self .process_info_packages , info_queue )
2334 self .channel .basic_consume (self .on_node_disconnect , disconnect_queue )
2435 self .discover_packet ()
@@ -40,7 +51,9 @@ def emit(self, event_name, data=None):
4051 data = {}
4152 event_package = MoleculerClient .build_event ('PYTHON-CLIENT' , event_name , data )
4253 for service_name in candidates :
43- queue_name = 'MOL.EVENTB.{service}.{event}' .format (service = service_name , event = event_name )
54+ queue_name = self .eventb_template .format (service = service_name , event = event_name ,
55+ namespace = self .namespace )
56+ print (queue_name )
4457 self .channel .basic_publish ('' , queue_name , event_package )
4558
4659 def broadcast (self , event_name , data = None ):
@@ -52,7 +65,7 @@ def broadcast(self, event_name, data=None):
5265 data = {}
5366 event_package = MoleculerClient .build_event ('PYTHON-CLIENT' , event_name , data )
5467 for node_id in candidates :
55- queue_name = 'MOL.EVENT.{node_id}' . format (node_id = node_id )
68+ queue_name = self . event_template . format (node_id = node_id , namespace = self . namespace )
5669 self .channel .basic_publish ('' , queue_name , event_package )
5770
5871 def call (self ):
@@ -91,6 +104,15 @@ def build_event(sender_node_id, event_name, payload):
91104class NetworkInfo :
92105 NODES = {}
93106
107+ def __init__ (self , namespace = None ):
108+ self .namespace = namespace
109+ if self .namespace is None :
110+ self .reqb_template = 'MOL.REQB.{action}'
111+ self .service_reqb_template = 'MOL.REQB.{service_name}.{action}'
112+ else :
113+ self .reqb_template = 'MOL.REQB.{action}' .replace ('MOL' , 'MOL-' + self .namespace )
114+ self .service_reqb_template = 'MOL.REQB.{service_name}.{action}' .replace ('MOL' , 'MOL-' + self .namespace )
115+
94116 def add_node (self , info_packet : dict ):
95117 node_id = info_packet ['sender' ]
96118 if node_id not in self .NODES .keys ():
@@ -104,10 +126,10 @@ def add_node(self, info_packet: dict):
104126 is_service_node = bool (service_name == '$node' )
105127 for action_name , action_spec in service ['actions' ].items ():
106128 if is_service_node :
107- queue_name = 'MOL.REQB.{action}' . format (action = action_name )
129+ queue_name = self . reqb_template . format (action = action_name , namespace = self . namespace )
108130 else :
109- queue_name = 'MOL.REQB.{service_name}.{action}' . format (service_name = service_name ,
110- action = action_name )
131+ queue_name = self . service_reqb_template . format (service_name = service_name , action = action_name ,
132+ namespace = self . namespace )
111133 node ['actions' ][action_name ] = queue_name
112134
113135 for event_name in service ['events' ].keys ():
0 commit comments