<br><br><div class="gmail_quote">On Sat, Jan 22, 2011 at 12:56 PM, Dan Tenenbaum <span dir="ltr"><<a href="mailto:dtenenba@fhcrc.org">dtenenba@fhcrc.org</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">
<br><br><div class="gmail_quote"><div><div></div><div class="h5">On Sat, Jan 22, 2011 at 2:46 AM, Michael Bridgen <span dir="ltr"><<a href="mailto:mikeb@rabbitmq.com" target="_blank">mikeb@rabbitmq.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
Hi Dan,<div><div></div><div><br>
<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
I am new to rabbitmq. I got the python "getting started" examples to<br>
work just fine.<br>
I would like to be able to send messages between node.js (using<br>
node.amqp) and python/pika.<br>
</blockquote>
><br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
I can successfully receive a message in node.js using this code:<br>
<br>
[receiver - node.js]<br>
var sys = require('sys');<br>
var amqp = require('./amqp');<br>
<br>
var connection = amqp.createConnection({ host: 'localhost' });<br>
<br>
// Wait for connection to become established.<br>
connection.addListener('ready', function () {<br>
// Create a queue and bind to all messages.<br>
// Use the default 'amq.topic' exchange<br>
<br>
var q = connection.queue('my-queue');<br>
// Catch all messages<br>
q.bind('#');<br>
<br>
// Receive messages<br>
q.subscribe(function (message) {<br>
// Print messages to stdout<br>
sys.puts(sys.inspect(message));<br>
});<br>
});<br>
<br>
[sender - python/pika]<br>
#!/usr/bin/env python<br>
import pika<br>
<br>
connection = pika.AsyncoreConnection(pika.ConnectionParameters(<br>
host='localhost'))<br>
channel = connection.channel()<br>
<br>
<br>
channel.basic_publish(exchange='',<br>
routing_key='my-queue',<br>
body='Hello World!')<br>
print " [x] Sent 'Hello World!'"<br>
connection.close()<br>
</blockquote>
<br></div></div>
You should be aware that node-amqp, at least Ryan's original and most forks of it, make some unorthodox choices for parameter defaults and behaviour, which to be fair are probably inherited from the EventMachine AMQP client.<br>
<br>
In particular, the exchange that's published or bound to, if none is given, is "amq.topic". This is directly at odds with AMQP, for which an empty exchange name means the "default exchange" defined in the specification; that is, route directly to the queue named in the routing key.<br>
<br>
So your example above is working by coincidence -- you don't need to bind the queue in the node.js code, since your Python code is effectively sending straight to the queue. I.e., the node.js code sets up<br>
<br>
(amq.topic) --"#"--> [ | | my-queue | | ]<br>
<br>
but the message sent in the Python code actually follows<br>
<br>
(default) --"my-queue"--> [ | | my-queue | | ]<div><div></div><div><br>
<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
But I can't go the other way. I'm not sure that the message sent by<br>
node.js is going anywhere. I've looked at the unit tests for node.amqp<br>
(they all pass) but there is something I am missing, probably to do with<br>
exchanges or something. I'd like to do something like this:<br>
<br>
[sender - node.js]<br>
var sys = require('sys');<br>
var amqp = require('./amqp');<br>
<br>
var connection = amqp.createConnection({ host: 'localhost' });<br>
<br>
// Wait for connection to become established.<br>
connection.addListener('ready', function () {<br>
// Create a queue and bind to all messages.<br>
// Use the default 'amq.topic' exchange<br>
connection.publish("my-queue", {random_key:"this is my message"});<br>
connection.end();<br>
});<br>
<br>
[receiver - python/pika]<br>
#!/usr/bin/env python<br>
import pika<br>
import sys<br>
<br>
connection = pika.AsyncoreConnection(pika.ConnectionParameters(<br>
host='localhost'))<br>
channel = connection.channel()<br>
<br>
print ' [*] Waiting for messages. To exit press CTRL+C'<br>
<br>
def callback(ch, method, properties, body):<br>
print body,<br>
sys.stdout.flush()<br>
channel.basic_consume(callback,<br>
queue='my-queue',<br>
no_ack=True)<br>
<br>
pika.asyncore_loop()<br>
</blockquote>
<br></div></div>
This way around (presuming there's a restart in-between runs), node.js sends to amq.topic, but nothing is bound there, so the message is discarded.<br>
<br>
You might try binding the queue, in the Python, to the exchange "amq.topic". You should probably also declare it.<br>
<br>
If you actually don't want to use "amq.topic", and you probably don't if you can help it, you might declare your own exchange and use that.<br>
<br></blockquote><div><br></div></div></div><div>OK, I am trying this approach, with the following sender:</div><div class="im"><div><br></div><div><br></div><div><font face="'courier new', monospace">var sys = require('sys');</font></div>
<div><font face="'courier new', monospace">var amqp = require('./amqp');</font></div><div><font face="'courier new', monospace"><br></font></div><div>
<font face="'courier new', monospace">var connection = amqp.createConnection({ host: 'localhost' });</font></div><div><font face="'courier new', monospace"><br>
</font></div><div><font face="'courier new', monospace">// Wait for connection to become established.</font></div><div><font face="'courier new', monospace">connection.addListener('ready', function () {</font></div>
<div><font face="'courier new', monospace"> // Create a queue and bind to all messages.</font></div><div><font face="'courier new', monospace"> // Use the default 'amq.topic' exchange</font></div>
</div><div><font face="'courier new', monospace"> var exchange = connection.exchange("my-exchange");</font></div><div><font face="'courier new', monospace"> exchange.publish("my-queue", {random_key:"this is my message"});</font></div>
<div><font face="'courier new', monospace"> connection.end();</font></div><div><font face="'courier new', monospace"> </font></div><div><font face="'courier new', monospace">});</font></div>
<div><br></div><div>and receiver:<br><br></div><div class="im"><div><font face="'courier new', monospace">#!/usr/bin/env python</font></div><div><font face="'courier new', monospace">import pika</font></div>
<div><font face="'courier new', monospace">import sys</font></div><div><font face="'courier new', monospace"><br></font></div><div><font face="'courier new', monospace"><br>
</font></div><div><font face="'courier new', monospace">connection = pika.AsyncoreConnection(pika.ConnectionParameters(</font></div><div><font face="'courier new', monospace"> host='localhost'))</font></div>
<div><font face="'courier new', monospace">channel = connection.channel()</font></div></div><div><font face="'courier new', monospace">result = channel.queue_declare('my-queue')</font></div>
<div><font face="'courier new', monospace">channel.exchange_declare(exchange='my-exchange')</font></div><div><font face="'courier new', monospace">channel.queue_bind(exchange='my-exchange', queue=result.queue)</font></div>
<div class="im">
<div><font face="'courier new', monospace"><br></font></div><div><font face="'courier new', monospace"><br></font></div><div><font face="'courier new', monospace">print ' [*] Waiting for messages. To exit press CTRL+C'</font></div>
<div><font face="'courier new', monospace"><br></font></div><div><font face="'courier new', monospace">def callback(ch, method, properties, body):</font></div>
<div><font face="'courier new', monospace"> print body,</font></div><div><font face="'courier new', monospace"> sys.stdout.flush()</font></div><div><font face="'courier new', monospace"> </font></div>
<div><font face="'courier new', monospace">channel.basic_consume(callback,</font></div><div><font face="'courier new', monospace"> queue='my-queue',</font></div>
<div><font face="'courier new', monospace"> no_ack=True)</font></div><div><font face="'courier new', monospace"><br></font></div><div>
<font face="'courier new', monospace">pika.asyncore_loop()</font></div><div><br></div></div><div>When I run the receiver, I get:</div><div><br></div><div><font face="'courier new', monospace">% python receiver.py </font></div>
<div><font face="'courier new', monospace">Traceback (most recent call last):</font></div><div><font face="'courier new', monospace"> File "receiver.py", line 9, in <module></font></div>
<div><font face="'courier new', monospace"> result = channel.queue_declare('my-queue')</font></div><div><font face="'courier new', monospace"> File "build/bdist.macosx-10.6-universal/egg/pika/spec.py", line 3003, in queue_declare</font></div>
<div><font face="'courier new', monospace"> File "build/bdist.macosx-10.6-universal/egg/pika/channel.py", line 187, in _rpc</font></div><div><font face="'courier new', monospace"> File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line 325, in _rpc</font></div>
<div><font face="'courier new', monospace"> File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line 301, in send_method</font></div><div><font face="'courier new', monospace"> File "build/bdist.macosx-10.6-universal/egg/pika/connection.py", line 295, in send_frame</font></div>
<div><font face="'courier new', monospace"> File "build/bdist.macosx-10.6-universal/egg/pika/codec.py", line 74, in marshal</font></div><div><font face="'courier new', monospace"> File "build/bdist.macosx-10.6-universal/egg/pika/spec.py", line 673, in encode</font></div>
<div><font face="'courier new', monospace">TypeError: unsupported operand type(s) for &: 'str' and 'long' </font></div><div><br></div><div>What am I doing wrong?</div><div>
Thanks very much for your help.</div><div>Dan</div><div><br></div></div>
</blockquote></div><br><div>Thanks a lot to you (and to James who replied off-list). I finally got python to receive a message from node.js.</div><div>I used Michael's fork and the following sender and receiver:</div>
<div>[sender]</div><div><div><font class="Apple-style-span" face="'courier new', monospace">var sys = require('sys');</font></div><div><font class="Apple-style-span" face="'courier new', monospace">var amqp = require('./amqp');</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"><br></font></div><div><font class="Apple-style-span" face="'courier new', monospace">var connection = amqp.createConnection({ host: 'localhost' });</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"><br></font></div><div><font class="Apple-style-span" face="'courier new', monospace">// Wait for connection to become established.</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace">connection.addListener('ready', function () {</font></div><div><font class="Apple-style-span" face="'courier new', monospace"> var x = connection.exchange()</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"> var q = connection.queue("aqueuename",</font></div><div><font class="Apple-style-span" face="'courier new', monospace"> { autoDelete: true, durable: false, exclusive: false });</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"> x.publish('aqueuename', {foo: "bar"});</font></div><div><font class="Apple-style-span" face="'courier new', monospace">});</font></div>
</div><div><font class="Apple-style-span" face="'courier new', monospace"><br></font></div><div><font class="Apple-style-span" face="'courier new', monospace"><meta charset="utf-8"><span class="Apple-style-span" style="font-family: arial; ">[receiver]</span></font></div>
<div><div><font class="Apple-style-span" face="'courier new', monospace">#!/usr/bin/env python</font></div><div><font class="Apple-style-span" face="'courier new', monospace">import pika</font></div><div><font class="Apple-style-span" face="'courier new', monospace"><br>
</font></div><div><font class="Apple-style-span" face="'courier new', monospace">connection = pika.AsyncoreConnection(pika.ConnectionParameters(</font></div><div><font class="Apple-style-span" face="'courier new', monospace"> host='localhost'))</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace">channel = connection.channel()</font></div><div><font class="Apple-style-span" face="'courier new', monospace"><br></font></div><div><font class="Apple-style-span" face="'courier new', monospace">q = channel.queue_declare(queue='aqueuename',</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"> auto_delete=True,</font></div><div><font class="Apple-style-span" face="'courier new', monospace"> durable=False,</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"> exclusive=False)</font></div><div><font class="Apple-style-span" face="'courier new', monospace"><br></font></div>
<div><font class="Apple-style-span" face="'courier new', monospace">print ' [*] Waiting for messages. To exit press CTRL+C'</font></div><div><font class="Apple-style-span" face="'courier new', monospace"><br>
</font></div><div><font class="Apple-style-span" face="'courier new', monospace">def callback(ch, method, properties, body):</font></div><div><font class="Apple-style-span" face="'courier new', monospace"> print " [x] Received %r" % (body,)</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"><br></font></div><div><font class="Apple-style-span" face="'courier new', monospace">channel.basic_consume(callback,</font></div><div><font class="Apple-style-span" face="'courier new', monospace"> queue='aqueuename',</font></div>
<div><font class="Apple-style-span" face="'courier new', monospace"> no_ack=True)</font></div><div><font class="Apple-style-span" face="'courier new', monospace"><br></font></div><div>
<font class="Apple-style-span" face="'courier new', monospace">pika.asyncore_loop()</font></div></div><div><br></div><div>One of the issues seemed to be the connection.end() that I had in the sender. Having that in made this not work, but taking it out means the sender doesn't exit after sending. But that's ok....I just wanted that so I could have a simple test program; my real implementation will not exit either, it will be a process that is always running, listening for messages and sending them sometimes. </div>
<div><br></div><div>Now I have to actually learn rabbitmq and figure out what sort of messaging strategies I want to use, but now I know it's possible to use node.js and python together, so I'm ready to continue my project.</div>
<div><br></div><div>Thanks again</div><div>Dan</div><div><br></div>