[rabbitmq-discuss] pika and node.amqp interop

Dan Tenenbaum dtenenba at fhcrc.org
Sun Jan 23 05:00:01 GMT 2011


On Sat, Jan 22, 2011 at 12:56 PM, Dan Tenenbaum <dtenenba at fhcrc.org> wrote:

>
>
> On Sat, Jan 22, 2011 at 2:46 AM, Michael Bridgen <mikeb at rabbitmq.com>wrote:
>
>> Hi Dan,
>>
>>
>>  I am new to rabbitmq. I got the python "getting started" examples to
>>> work just fine.
>>> I would like to be able to send messages between node.js (using
>>> node.amqp) and python/pika.
>>>
>> >
>>
>>> I can successfully receive a message in node.js using this code:
>>>
>>> [receiver - node.js]
>>> var sys = require('sys');
>>> var amqp = require('./amqp');
>>>
>>> var connection = amqp.createConnection({ host: 'localhost' });
>>>
>>> // Wait for connection to become established.
>>> connection.addListener('ready', function () {
>>>   // Create a queue and bind to all messages.
>>>   // Use the default 'amq.topic' exchange
>>>
>>>   var q = connection.queue('my-queue');
>>>   // Catch all messages
>>>   q.bind('#');
>>>
>>>   // Receive messages
>>>   q.subscribe(function (message) {
>>>     // Print messages to stdout
>>>     sys.puts(sys.inspect(message));
>>>   });
>>> });
>>>
>>> [sender - python/pika]
>>> #!/usr/bin/env python
>>> import pika
>>>
>>> connection = pika.AsyncoreConnection(pika.ConnectionParameters(
>>>         host='localhost'))
>>> channel = connection.channel()
>>>
>>>
>>> channel.basic_publish(exchange='',
>>>                       routing_key='my-queue',
>>>                       body='Hello World!')
>>> print " [x] Sent 'Hello World!'"
>>> connection.close()
>>>
>>
>> You should be aware that node-amqp, at least Ryan's original and most
>> forks of it, make some unorthodox choices for parameter defaults and
>> behaviour, which to be fair are probably inherited from the EventMachine
>> AMQP client.
>>
>> In particular, the exchange that's published or bound to, if none is
>> given, is "amq.topic".  This is directly at odds with AMQP, for which an
>> empty exchange name means the "default exchange" defined in the
>> specification; that is, route directly to the queue named in the routing
>> key.
>>
>> So your example above is working by coincidence -- you don't need to bind
>> the queue in the node.js code, since your Python code is effectively sending
>> straight to the queue.  I.e., the node.js code sets up
>>
>> (amq.topic) --"#"--> [ | | my-queue | | ]
>>
>> but the message sent in the Python code actually follows
>>
>> (default) --"my-queue"--> [ | | my-queue | | ]
>>
>>
>>  But I can't go the other way. I'm not sure that the message sent by
>>> node.js is going anywhere. I've looked at the unit tests for node.amqp
>>> (they all pass) but there is something I am missing, probably to do with
>>> exchanges or something. I'd like to do something like this:
>>>
>>> [sender - node.js]
>>> var sys = require('sys');
>>> var amqp = require('./amqp');
>>>
>>> var connection = amqp.createConnection({ host: 'localhost' });
>>>
>>> // Wait for connection to become established.
>>> connection.addListener('ready', function () {
>>>   // Create a queue and bind to all messages.
>>>   // Use the default 'amq.topic' exchange
>>>   connection.publish("my-queue", {random_key:"this is my message"});
>>>   connection.end();
>>> });
>>>
>>> [receiver - python/pika]
>>> #!/usr/bin/env python
>>> import pika
>>> import sys
>>>
>>> connection = pika.AsyncoreConnection(pika.ConnectionParameters(
>>>         host='localhost'))
>>> channel = connection.channel()
>>>
>>> print ' [*] Waiting for messages. To exit press CTRL+C'
>>>
>>> def callback(ch, method, properties, body):
>>>     print body,
>>>     sys.stdout.flush()
>>> channel.basic_consume(callback,
>>>                       queue='my-queue',
>>>                       no_ack=True)
>>>
>>> pika.asyncore_loop()
>>>
>>
>> This way around (presuming there's a restart in-between runs), node.js
>> sends to amq.topic, but nothing is bound there, so the message is discarded.
>>
>> You might try binding the queue, in the Python, to the exchange
>> "amq.topic".  You should probably also declare it.
>>
>> If you actually don't want to use "amq.topic", and you probably don't if
>> you can help it, you might declare your own exchange and use that.
>>
>>
> OK, I am trying this approach, with the following sender:
>
>
> var sys = require('sys');
> var amqp = require('./amqp');
>
> var connection = amqp.createConnection({ host: 'localhost' });
>
> // Wait for connection to become established.
> connection.addListener('ready', function () {
>   // Create a queue and bind to all messages.
>   // Use the default 'amq.topic' exchange
>   var exchange = connection.exchange("my-exchange");
>   exchange.publish("my-queue", {random_key:"this is my message"});
>   connection.end();
>
> });
>
> and receiver:
>
> #!/usr/bin/env python
> import pika
> import sys
>
>
> connection = pika.AsyncoreConnection(pika.ConnectionParameters(
>         host='localhost'))
> channel = connection.channel()
> result = channel.queue_declare('my-queue')
> channel.exchange_declare(exchange='my-exchange')
> channel.queue_bind(exchange='my-exchange', queue=result.queue)
>
>
> print ' [*] Waiting for messages. To exit press CTRL+C'
>
> def callback(ch, method, properties, body):
>     print body,
>     sys.stdout.flush()
>
> channel.basic_consume(callback,
>                       queue='my-queue',
>                       no_ack=True)
>
> pika.asyncore_loop()
>
> When I run the receiver, I get:
>
> % python receiver.py
> Traceback (most recent call last):
>   File "receiver.py", line 9, in <module>
>     result = channel.queue_declare('my-queue')
>   File "build/bdist.macosx-10.6-universal/egg/pika/spec.py", line 3003, in
> queue_declare
>   File "build/bdist.macosx-10.6-universal/egg/pika/channel.py", line 187,
> in _rpc
>   File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line
> 325, in _rpc
>   File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line
> 301, in send_method
>   File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line
> 295, in send_frame
>   File "build/bdist.macosx-10.6-universal/egg/pika/codec.py", line 74, in
> marshal
>   File "build/bdist.macosx-10.6-universal/egg/pika/spec.py", line 673, in
> encode
> TypeError: unsupported operand type(s) for &: 'str' and 'long'
>
> What am I doing wrong?
> Thanks very much for your help.
> Dan
>
>
Thanks a lot to you (and to James who replied off-list). I finally got
python to receive a message from node.js.
I used Michael's fork and the following sender and receiver:
[sender]
var sys = require('sys');
var amqp = require('./amqp');

var connection = amqp.createConnection({ host: 'localhost' });

// Wait for connection to become established.
connection.addListener('ready', function () {
    var x = connection.exchange()
    var q = connection.queue("aqueuename",
             { autoDelete: true, durable: false, exclusive: false });
    x.publish('aqueuename', {foo: "bar"});
});

[receiver]
#!/usr/bin/env python
import pika

connection = pika.AsyncoreConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

q = channel.queue_declare(queue='aqueuename',
                         auto_delete=True,
                         durable=False,
                         exclusive=False)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='aqueuename',
                      no_ack=True)

pika.asyncore_loop()

One of the issues seemed to be the connection.end() that I had in the
sender. Having that in made this not work, but taking it out means the
sender doesn't exit after sending. But that's ok....I just wanted that so I
could have a simple test program; my real implementation will not exit
either, it will be a process that is always running, listening for messages
and sending them sometimes.

Now I have to actually learn rabbitmq and figure out what sort of messaging
strategies I want to use, but now I know it's possible to use node.js and
python together, so I'm ready to continue my project.

Thanks again
Dan
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110122/d3c66878/attachment-0001.htm>


More information about the rabbitmq-discuss mailing list