Skip to content

Commit b2670ad

Browse files
committed
added event packet handling +small updates to meet transport options
1 parent 18e2ada commit b2670ad

3 files changed

Lines changed: 14 additions & 3 deletions

File tree

moleculer/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def open_channel(self):
106106
def on_channel_open(self, channel):
107107
LOGGER.info('Channel opened')
108108
self._channel: Channel = channel
109+
self._channel.basic_qos(prefetch_count=1)
109110
# self._channel.confirm_delivery() # Enabled delivery confirmations
110111
self.add_on_channel_close_callback()
111112
self.create_topics()
@@ -236,7 +237,7 @@ def setup_queue(self, queue_name, ttl=True):
236237
LOGGER.info('Declaring queue %s', queue_name)
237238
arguments = {}
238239
if ttl:
239-
arguments['x-message-ttl'] = 3000
240+
arguments['x-message-ttl'] = 5000 # eventTimeToLive: https://github.com/ice-services/moleculer/pull/72
240241
self._channel.queue_declare(self.on_queue_declareok, queue_name, arguments=arguments)
241242

242243
def on_queue_declareok(self, method_frame):

moleculer/consumer.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
from .topics import EXCHANGES as MOLECULER_EXCHANGES
33
import datetime
4-
from .service import request_handler, INFO_PACKET_TEMPLATE
4+
from .service import request_handler, INFO_PACKET_TEMPLATE, event_handler
55

66

77
class MoleculerConsumer:
@@ -17,7 +17,7 @@ def info(self, channel, basic_deliver, properties, body):
1717
if not self.is_node_discovered:
1818
info_packet = json.loads(body)
1919
sender = info_packet['sender']
20-
if sender != self.node_id:
20+
if sender != self.node_id: # TODO: send info package anyway
2121
info_packet = INFO_PACKET_TEMPLATE
2222
info_packet['sender'] = self.node_id
2323
info_packet['services'][0]['nodeID'] = self.node_id
@@ -63,8 +63,14 @@ def request(self, channel, basic_deliver, properties, body):
6363
channel.basic_publish('', sender_exchange, json.dumps(response_packet))
6464

6565
def disconnect(self, channel, basic_deliver, properties, body):
66+
# TODO: remove disconnected services DISCOVERED list
6667
pass
6768

6869
def response(self, channel, basic_deliver, properties, body):
6970
channel.basic_ack(basic_deliver.delivery_tag)
7071
# TODO: handle responses from other services
72+
73+
def event(self, channel, basic_deliver, properties, body):
74+
event_packet = json.loads(body)
75+
sender, event, data = event_packet['sender'], event_packet['event'], event_packet['data']
76+
event_handler(sender, event, data)

moleculer/service.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ def request_handler(action: str, params: dict) -> bool:
2727
return True
2828

2929

30+
def event_handler(sender_node_id: str, event: str, payload: dict):
31+
pass
32+
33+
3034
INFO_PACKET_TEMPLATE = {
3135
'ver': '2',
3236
'sender': None,

0 commit comments

Comments
 (0)