11#!/usr/bin/env python
22import sys
3+ import itertools
34import time
45from stompy import Stomp
56from optparse import OptionParser
67
8+ def callback_func (message ):
9+ print ("Using Callback Function" )
10+ print (message .headers .get ('message-id' ))
11+ print (message .body )
12+ stomp .ack (message )
713
8- def consume (host , port , queue , num = None ):
9- try :
10- stomp = Stomp (host , port )
11- # optional connect keyword args "username" and "password" like so:
12- # stomp.connect(username="user", password="pass")
13- stomp .connect ()
14- except :
15- print ("Cannot connect" )
16- raise
14+ def consume (queue , num = None , callback = None ):
1715
1816 # If using RabbitMQ, the queue seems to 'disappear'
1917 # after disconnecting a consumer, to make the queue persistent
2018 # add the headers 'auto-delete': 'false' and 'durable': 'true'
2119 # to the dictionary below
2220 stomp .subscribe ({'destination' : queue , 'ack' : 'client' })
2321
24- if not num :
25- while True :
26- try :
27- frame = stomp .receive_frame ( )
28- stomp . ack ( frame )
29- print ( frame . headers . get ( 'message-id' ))
30- print ( frame . body )
31- except KeyboardInterrupt :
32- stomp . disconnect ()
33- break
34- else :
35- for i in xrange ( 0 , num ):
36- try :
22+ def _handle_message ( frame ) :
23+ print ( frame . headers . get ( 'message-id' ))
24+ print ( frame . body )
25+ stomp .ack ( frame )
26+
27+ # if num is not set, iterate forever.
28+ it = xrange ( 0 , num ) if num else itertools . count ( )
29+
30+ try :
31+ for i in it :
32+ if callback :
33+ stomp . receive_frame ( callback = callback_func )
34+ else :
3735 frame = stomp .receive_frame ()
38- stomp .ack (frame )
39- print (frame .headers .get ('message-id' ))
40- print (frame .body )
41- except KeyboardInterrupt :
42- stomp .disconnect ()
43- break
36+ _handle_message (frame )
37+ finally :
4438 stomp .disconnect ()
4539
4640
47- def produce (host , port , queue , num = 1000 ):
48- try :
49- stomp = Stomp (host , port )
50- # optional connect keyword args "username" and "password" like so:
51- # stomp.connect(username="user", password="pass")
52- stomp .connect ()
53- except :
54- print ("Cannot connect" )
55- raise
41+ def produce (queue , num = 1000 ):
5642
5743 for i in xrange (0 , num ):
5844 print ("Message #%d" % i )
@@ -75,6 +61,9 @@ def produce(host, port, queue, num=1000):
7561 default = False , dest = 'produce' , help = 'produce messages' )
7662 parser .add_option ('-c' , '--consume' , action = 'store_true' ,
7763 default = False , dest = 'consume' , help = 'consume messages' )
64+ parser .add_option ('-C' , '--use-callback' , action = 'store_true' ,
65+ default = False , dest = 'callback' ,
66+ help = 'send retrieved message to python callable' )
7867 parser .add_option ('-n' , '--number' , action = 'store' ,
7968 type = 'int' , dest = 'number' ,
8069 help = 'produce or consume NUMBER messages' )
@@ -94,7 +83,16 @@ def produce(host, port, queue, num=1000):
9483 parser .print_help ()
9584 sys .exit (1 )
9685
86+ try :
87+ stomp = Stomp (options .host , options .port )
88+ # optional connect keyword args "username" and "password" like so:
89+ # stomp.connect(username="user", password="pass")
90+ stomp .connect ()
91+ except :
92+ print ("Cannot connect" )
93+ raise
94+
9795 if options .produce :
98- produce (options .host , options . port , options . queue , options .number )
96+ produce (options .queue , options .number )
9997 elif options .consume :
100- consume (options .host , options .port , options .queue , options . number )
98+ consume (options .queue , options .number , options .callback )
0 commit comments