[rabbitmq-discuss] nonblocking py-amqplib issues
majek04
majek04 at gmail.com
Tue Nov 25 11:25:34 GMT 2008
I tried to use amqplib.nbclient_0_8, but I had some problems with it.
I attached the code [1].
1. More than one message in the queue:
console #1:
$ ./rec_2.py (this is the attached code)
<I pressed ctrl+z to suspend the program>
[1]+ Stopped ./rec_2.py
console #2:
$ ./demo_send.py 1 (standard amqplib demo program)
$ ./demo_send.py 2
back to console #1:
$ fg (to resume the program)
Traceback (most recent call last):
File "./rec_2.py", line 31, in <module>
amqp.nbloop([ch])
File "build/bdist.linux-i686/egg/amqplib/nbclient_0_8.py", line 176, in nbloop
File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 203, in wait
File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 117,
in _dispatch
File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 2854,
in _basic_deliver
File "./rec_2.py", line 7, in callback
msg.channel.basic_ack(msg.delivery_tag)
File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 2609,
in basic_ack
File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 127,
in _send_method_frame
File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 384,
in _send_channel_method_frame
File "build/bdist.linux-i686/egg/amqplib/nbclient_0_8.py", line 74, in write
<P�' read_buf='&<!amq.ctag-rdYQarPUansf8+2dGsnFAQ==�6<<!amq.ctag-rdYQarPUansf8+2dGsnFAQ==myfan2<�
text/plainfooIbarSbaz�1�6<<!amq.ctag-rdYQarPUansf8+2dGsnFAQ==myfan2<�
text/plainfooIbarSbaz�2�' read_p='175'
2. Order of the messages is wrong.
I just changed the callback function and disabled basic_ack:
def callback(msg):
m.append( msg.body )
#### msg.channel.basic_ack(msg.delivery_tag) # disable ack
Console #1:
$ ./rec2_py
Console #2:
$ ./demo_send.py 1
$ ./demo_send.py 2
$ ./demo_send.py 3
$ ./demo_send.py 4
Back to console #1:
['1', '1']
['2', '1', '2']
['3', '1', '2', '3']
['4', '1', '2', '3', '4']
3. no_ack option for basic_consume is not working.
With disabled basic_ack, and added no_ack to basic_consume:
ch.consumer_tag = ch.basic_consume(qname, callback=callback, no_ack=True)
I get exactly the same results as before.
I hope there is something that can be done to address this issues.
Cheers!
Marek Majkowski
[1] The code of rec_2.py:
http://www.lshift.net/~majek/rec_2.py
I paste it also here:
import amqplib.nbclient_0_8 as amqp
m = []
def callback(msg):
m.append( msg.body )
msg.channel.basic_ack(msg.delivery_tag)
class NoActivityException(Exception):pass
def my_nb_callback(ch): raise NoActivityException
if __name__ == '__main__':
conn = amqp.NonBlockingConnection('localhost',
userid='guest', password='guest',
nb_callback=my_nb_callback, insist=True, nb_sleep=0.0)
ch = conn.channel()
ch.access_request('/data', active=True, read=True)
ch.exchange_declare('myfan', 'fanout', auto_delete=True)
qname, _, _ = ch.queue_declare()
ch.queue_bind(qname, 'myfan')
ch.consumer_tag = ch.basic_consume(qname, callback=callback)
while True:
conn.sock.sock.setblocking(1)
conn.sock.sock.recv(0)
conn.sock.sock.setblocking(0)
try:
amqp.nbloop([ch])
except NoActivityException, e:
pass
print "%r" % (m,)
while m: m.pop()
More information about the rabbitmq-discuss
mailing list