[rabbitmq-discuss] nonblocking py-amqplib issues

majek04 majek04 at gmail.com
Tue Nov 25 11:25:34 GMT 2008


I tried to use amqplib.nbclient_0_8, but I had some problems with it.
I attached the code [1].

1. More than one message in the queue:
console #1:
$ ./rec_2.py  (this is the attached code)
<I pressed ctrl+z to suspend the program>
[1]+  Stopped                 ./rec_2.py

console #2:
$ ./demo_send.py 1 (standard amqplib demo program)
$ ./demo_send.py 2

back to console #1:
$ fg (to resume the program)
Traceback (most recent call last):
  File "./rec_2.py", line 31, in <module>
    amqp.nbloop([ch])
  File "build/bdist.linux-i686/egg/amqplib/nbclient_0_8.py", line 176, in nbloop
  File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 203, in wait
  File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 117,
in _dispatch
  File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 2854,
in _basic_deliver
  File "./rec_2.py", line 7, in callback
    msg.channel.basic_ack(msg.delivery_tag)
  File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 2609,
in basic_ack
  File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 127,
in _send_method_frame
  File "build/bdist.linux-i686/egg/amqplib/client_0_8.py", line 384,
in _send_channel_method_frame
  File "build/bdist.linux-i686/egg/amqplib/nbclient_0_8.py", line 74, in write
<P�' read_buf='&<!amq.ctag-rdYQarPUansf8+2dGsnFAQ==�6<<!amq.ctag-rdYQarPUansf8+2dGsnFAQ==myfan2<�
text/plainfooIbarSbaz�1�6<<!amq.ctag-rdYQarPUansf8+2dGsnFAQ==myfan2<�
text/plainfooIbarSbaz�2�' read_p='175'

2. Order of the messages is wrong.
I just changed the callback function and disabled basic_ack:

def callback(msg):
    m.append( msg.body )
    #### msg.channel.basic_ack(msg.delivery_tag) # disable ack

Console #1:
$ ./rec2_py

Console #2:
$ ./demo_send.py 1
$ ./demo_send.py 2
$ ./demo_send.py 3
$ ./demo_send.py 4

Back to console #1:
['1', '1']
['2', '1', '2']
['3', '1', '2', '3']
['4', '1', '2', '3', '4']


3. no_ack option for basic_consume is not working.
With disabled basic_ack, and added no_ack to basic_consume:
    ch.consumer_tag = ch.basic_consume(qname, callback=callback, no_ack=True)
I get exactly the same results as before.



I hope there is something that can be done to address this issues.

Cheers!
 Marek Majkowski


[1] The code of rec_2.py:
http://www.lshift.net/~majek/rec_2.py
I paste it also here:

import amqplib.nbclient_0_8 as amqp

m = []
def callback(msg):
    m.append( msg.body )
    msg.channel.basic_ack(msg.delivery_tag)

class NoActivityException(Exception):pass

def my_nb_callback(ch): raise NoActivityException

if __name__ == '__main__':
    conn = amqp.NonBlockingConnection('localhost',
         userid='guest', password='guest',
         nb_callback=my_nb_callback, insist=True, nb_sleep=0.0)

    ch = conn.channel()
    ch.access_request('/data', active=True, read=True)

    ch.exchange_declare('myfan', 'fanout', auto_delete=True)
    qname, _, _ = ch.queue_declare()
    ch.queue_bind(qname, 'myfan')
    ch.consumer_tag = ch.basic_consume(qname, callback=callback)

    while True:
        conn.sock.sock.setblocking(1)
        conn.sock.sock.recv(0)
        conn.sock.sock.setblocking(0)
        try:
            amqp.nbloop([ch])
        except NoActivityException, e:
            pass
        print "%r" % (m,)
        while m: m.pop()


More information about the rabbitmq-discuss mailing list