[rabbitmq-discuss] pika and node.amqp interop
Dan Tenenbaum
dtenenba at fhcrc.org
Sun Jan 23 05:00:01 GMT 2011
On Sat, Jan 22, 2011 at 12:56 PM, Dan Tenenbaum <dtenenba at fhcrc.org> wrote:
>
>
> On Sat, Jan 22, 2011 at 2:46 AM, Michael Bridgen <mikeb at rabbitmq.com>wrote:
>
>> Hi Dan,
>>
>>
>> I am new to rabbitmq. I got the python "getting started" examples to
>>> work just fine.
>>> I would like to be able to send messages between node.js (using
>>> node.amqp) and python/pika.
>>>
>> >
>>
>>> I can successfully receive a message in node.js using this code:
>>>
>>> [receiver - node.js]
>>> var sys = require('sys');
>>> var amqp = require('./amqp');
>>>
>>> var connection = amqp.createConnection({ host: 'localhost' });
>>>
>>> // Wait for connection to become established.
>>> connection.addListener('ready', function () {
>>> // Create a queue and bind to all messages.
>>> // Use the default 'amq.topic' exchange
>>>
>>> var q = connection.queue('my-queue');
>>> // Catch all messages
>>> q.bind('#');
>>>
>>> // Receive messages
>>> q.subscribe(function (message) {
>>> // Print messages to stdout
>>> sys.puts(sys.inspect(message));
>>> });
>>> });
>>>
>>> [sender - python/pika]
>>> #!/usr/bin/env python
>>> import pika
>>>
>>> connection = pika.AsyncoreConnection(pika.ConnectionParameters(
>>> host='localhost'))
>>> channel = connection.channel()
>>>
>>>
>>> channel.basic_publish(exchange='',
>>> routing_key='my-queue',
>>> body='Hello World!')
>>> print " [x] Sent 'Hello World!'"
>>> connection.close()
>>>
>>
>> You should be aware that node-amqp, at least Ryan's original and most
>> forks of it, make some unorthodox choices for parameter defaults and
>> behaviour, which to be fair are probably inherited from the EventMachine
>> AMQP client.
>>
>> In particular, the exchange that's published or bound to, if none is
>> given, is "amq.topic". This is directly at odds with AMQP, for which an
>> empty exchange name means the "default exchange" defined in the
>> specification; that is, route directly to the queue named in the routing
>> key.
>>
>> So your example above is working by coincidence -- you don't need to bind
>> the queue in the node.js code, since your Python code is effectively sending
>> straight to the queue. I.e., the node.js code sets up
>>
>> (amq.topic) --"#"--> [ | | my-queue | | ]
>>
>> but the message sent in the Python code actually follows
>>
>> (default) --"my-queue"--> [ | | my-queue | | ]
>>
>>
>> But I can't go the other way. I'm not sure that the message sent by
>>> node.js is going anywhere. I've looked at the unit tests for node.amqp
>>> (they all pass) but there is something I am missing, probably to do with
>>> exchanges or something. I'd like to do something like this:
>>>
>>> [sender - node.js]
>>> var sys = require('sys');
>>> var amqp = require('./amqp');
>>>
>>> var connection = amqp.createConnection({ host: 'localhost' });
>>>
>>> // Wait for connection to become established.
>>> connection.addListener('ready', function () {
>>> // Create a queue and bind to all messages.
>>> // Use the default 'amq.topic' exchange
>>> connection.publish("my-queue", {random_key:"this is my message"});
>>> connection.end();
>>> });
>>>
>>> [receiver - python/pika]
>>> #!/usr/bin/env python
>>> import pika
>>> import sys
>>>
>>> connection = pika.AsyncoreConnection(pika.ConnectionParameters(
>>> host='localhost'))
>>> channel = connection.channel()
>>>
>>> print ' [*] Waiting for messages. To exit press CTRL+C'
>>>
>>> def callback(ch, method, properties, body):
>>> print body,
>>> sys.stdout.flush()
>>> channel.basic_consume(callback,
>>> queue='my-queue',
>>> no_ack=True)
>>>
>>> pika.asyncore_loop()
>>>
>>
>> This way around (presuming there's a restart in-between runs), node.js
>> sends to amq.topic, but nothing is bound there, so the message is discarded.
>>
>> You might try binding the queue, in the Python, to the exchange
>> "amq.topic". You should probably also declare it.
>>
>> If you actually don't want to use "amq.topic", and you probably don't if
>> you can help it, you might declare your own exchange and use that.
>>
>>
> OK, I am trying this approach, with the following sender:
>
>
> var sys = require('sys');
> var amqp = require('./amqp');
>
> var connection = amqp.createConnection({ host: 'localhost' });
>
> // Wait for connection to become established.
> connection.addListener('ready', function () {
> // Create a queue and bind to all messages.
> // Use the default 'amq.topic' exchange
> var exchange = connection.exchange("my-exchange");
> exchange.publish("my-queue", {random_key:"this is my message"});
> connection.end();
>
> });
>
> and receiver:
>
> #!/usr/bin/env python
> import pika
> import sys
>
>
> connection = pika.AsyncoreConnection(pika.ConnectionParameters(
> host='localhost'))
> channel = connection.channel()
> result = channel.queue_declare('my-queue')
> channel.exchange_declare(exchange='my-exchange')
> channel.queue_bind(exchange='my-exchange', queue=result.queue)
>
>
> print ' [*] Waiting for messages. To exit press CTRL+C'
>
> def callback(ch, method, properties, body):
> print body,
> sys.stdout.flush()
>
> channel.basic_consume(callback,
> queue='my-queue',
> no_ack=True)
>
> pika.asyncore_loop()
>
> When I run the receiver, I get:
>
> % python receiver.py
> Traceback (most recent call last):
> File "receiver.py", line 9, in <module>
> result = channel.queue_declare('my-queue')
> File "build/bdist.macosx-10.6-universal/egg/pika/spec.py", line 3003, in
> queue_declare
> File "build/bdist.macosx-10.6-universal/egg/pika/channel.py", line 187,
> in _rpc
> File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line
> 325, in _rpc
> File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line
> 301, in send_method
> File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line
> 295, in send_frame
> File "build/bdist.macosx-10.6-universal/egg/pika/codec.py", line 74, in
> marshal
> File "build/bdist.macosx-10.6-universal/egg/pika/spec.py", line 673, in
> encode
> TypeError: unsupported operand type(s) for &: 'str' and 'long'
>
> What am I doing wrong?
> Thanks very much for your help.
> Dan
>
>
Thanks a lot to you (and to James who replied off-list). I finally got
python to receive a message from node.js.
I used Michael's fork and the following sender and receiver:
[sender]
var sys = require('sys');
var amqp = require('./amqp');
var connection = amqp.createConnection({ host: 'localhost' });
// Wait for connection to become established.
connection.addListener('ready', function () {
var x = connection.exchange()
var q = connection.queue("aqueuename",
{ autoDelete: true, durable: false, exclusive: false });
x.publish('aqueuename', {foo: "bar"});
});
[receiver]
#!/usr/bin/env python
import pika
connection = pika.AsyncoreConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
q = channel.queue_declare(queue='aqueuename',
auto_delete=True,
durable=False,
exclusive=False)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='aqueuename',
no_ack=True)
pika.asyncore_loop()
One of the issues seemed to be the connection.end() that I had in the
sender. Having that in made this not work, but taking it out means the
sender doesn't exit after sending. But that's ok....I just wanted that so I
could have a simple test program; my real implementation will not exit
either, it will be a process that is always running, listening for messages
and sending them sometimes.
Now I have to actually learn rabbitmq and figure out what sort of messaging
strategies I want to use, but now I know it's possible to use node.js and
python together, so I'm ready to continue my project.
Thanks again
Dan
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110122/d3c66878/attachment-0001.htm>
More information about the rabbitmq-discuss
mailing list