[rabbitmq-discuss] Tornado Consumer example problem
Laing, Michael
michael.laing at nytimes.com
Thu Aug 15 15:25:23 BST 2013
The connect method doesn't apear to return a connection...
On Thu, Aug 15, 2013 at 9:51 AM, Priyanki Vashi <vashi.priyanki at gmail.com>wrote:
> Hi ,
>
> on_connection_open, is a callback method, which will be called when
> connection is opened with rabbitmq. It opens channel inturn. So it's little
> unlikely that it has 'None' value but I am not 100% sure.
>
> Here is my code. Could you quickly have a look.
> It's the same example from pika site for tornado consumer except that I
> have tried to use connection parameters instead of URL.
>
>
> from pika import adapters
> import pika
> import logging
>
> LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
> '-35s %(lineno) -5d: %(message)s')
> LOGGER = logging.getLogger(__name__)
>
>
> class ExampleConsumer(object):
> """This is an example consumer that will handle unexpected interactions
> with RabbitMQ such as channel and connection closures.
>
> If RabbitMQ closes the connection, it will reopen it. You should
> look at the output, as there are limited reasons why the connection may
> be closed, which usually are tied to permission related issues or
> socket timeouts.
>
> If the channel is closed, it will indicate a problem with one of the
> commands that were issued and that should surface in the output as
> well.
>
> """
> EXCHANGE = 'message'
> EXCHANGE_TYPE = 'topic'
> QUEUE = 'text'
> ROUTING_KEY = 'example.text'
>
>
>
>
> def __init__(self, host, port, username, password):
> """Setup the example publisher object, passing in the URL we will
> use
> to connect to RabbitMQ.
>
> :param str amqp_url: The URL for connecting to RabbitMQ
>
> """
>
> self._connection = None
> self._channel = None
> self._closing = False
> self._consumer_tag = None
> #self._url = amqp_url
>
> self.credentials = pika.PlainCredentials(username=username,
> password=password)
> self.parameters = pika.ConnectionParameters(host=host, port=port,
> credentials=self.credentials)
>
>
>
> def connect(self):
> """This method connects to RabbitMQ, returning the connection
> handle.
> When the connection is established, the on_connection_open method
> will be invoked by pika.
>
> :rtype: pika.SelectConnection
>
> """
> #LOGGER.info('Connecting to %s', self._url)
> adapters.TornadoConnection(self.parameters,
> on_open_callback=self.on_connection_open, stop_ioloop_on_close=False)
> #return
> adapters.TornadoConnection(pika.URLParameters(self._url),self.on_connection_open)
>
> def close_connection(self):
> """This method closes the connection to RabbitMQ."""
> LOGGER.info('Closing connection')
> self._connection.close()
>
> def add_on_connection_close_callback(self):
> """This method adds an on close callback that will be invoked by
> pika
> when RabbitMQ closes the connection to the publisher unexpectedly.
>
> """
> LOGGER.info('Adding connection close callback')
> self._connection.add_on_close_callback(self.on_connection_closed)
>
> def on_connection_closed(self, connection, reply_code, reply_text):
> """This method is invoked by pika when the connection to RabbitMQ
> is
> closed unexpectedly. Since it is unexpected, we will reconnect to
> RabbitMQ if it disconnects.
>
> :param pika.connection.Connection connection: The closed
> connection obj
> :param int reply_code: The server provided reply_code if given
> :param str reply_text: The server provided reply_text if given
>
> """
> self._channel = None
> if self._closing:
> self._connection.ioloop.stop()
> else:
> LOGGER.warning('Connection closed, reopening in 5 seconds:
> (%s) %s',
> reply_code, reply_text)
> self._connection.add_timeout(5, self.reconnect)
>
> def on_connection_open(self, unused_connection):
> """This method is called by pika once the connection to RabbitMQ
> has
> been established. It passes the handle to the connection object in
> case we need it, but in this case, we'll just mark it unused.
>
> :type unused_connection: pika.SelectConnection
>
> """
> LOGGER.info('Connection opened')
> self.add_on_connection_close_callback()
> self.open_channel()
>
> def reconnect(self):
> """Will be invoked by the IOLoop timer if the connection is
> closed. See the on_connection_closed method.
>
> """
> # This is the old connection IOLoop instance, stop its ioloop
> self._connection.ioloop.stop()
>
> if not self._closing:
>
> # Create a new connection
> self._connection = self.connect()
>
> # There is now a new connection, needs a new ioloop to run
> self._connection.ioloop.start()
>
> def add_on_channel_close_callback(self):
> """This method tells pika to call the on_channel_closed method if
> RabbitMQ unexpectedly closes the channel.
>
> """
> LOGGER.info('Adding channel close callback')
> self._channel.add_on_close_callback(self.on_channel_closed)
>
> def on_channel_closed(self, channel, reply_code, reply_text):
> """Invoked by pika when RabbitMQ unexpectedly closes the channel.
> Channels are usually closed if you attempt to do something that
> violates the protocol, such as re-declare an exchange or queue with
> different parameters. In this case, we'll close the connection
> to shutdown the object.
>
> :param pika.channel.Channel: The closed channel
> :param int reply_code: The numeric reason the channel was closed
> :param str reply_text: The text reason the channel was closed
>
> """
> LOGGER.warning('Channel %i was closed: (%s) %s',
> channel, reply_code, reply_text)
> self._connection.close()
>
> def on_channel_open(self, channel):
> """This method is invoked by pika when the channel has been opened.
> The channel object is passed in so we can make use of it.
>
> Since the channel is now open, we'll declare the exchange to use.
>
> :param pika.channel.Channel channel: The channel object
>
> """
> LOGGER.info('Channel opened')
> self._channel = channel
> self.add_on_channel_close_callback()
> self.setup_exchange(self.EXCHANGE)
>
> def setup_exchange(self, exchange_name):
> """Setup the exchange on RabbitMQ by invoking the Exchange.Declare
> RPC
> command. When it is complete, the on_exchange_declareok method will
> be invoked by pika.
>
> :param str|unicode exchange_name: The name of the exchange to
> declare
>
> """
> LOGGER.info('Declaring exchange %s', exchange_name)
> self._channel.exchange_declare(self.on_exchange_declareok,
> exchange_name,
> self.EXCHANGE_TYPE)
>
> def on_exchange_declareok(self, unused_frame):
> """Invoked by pika when RabbitMQ has finished the Exchange.Declare
> RPC
> command.
>
> :param pika.Frame.Method unused_frame: Exchange.DeclareOk response
> frame
>
> """
> LOGGER.info('Exchange declared')
> self.setup_queue(self.QUEUE)
>
> def setup_queue(self, queue_name):
> """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
> command. When it is complete, the on_queue_declareok method will
> be invoked by pika.
>
> :param str|unicode queue_name: The name of the queue to declare.
>
> """
> LOGGER.info('Declaring queue %s', queue_name)
> self._channel.queue_declare(self.on_queue_declareok, queue_name)
>
> def on_queue_declareok(self, method_frame):
> """Method invoked by pika when the Queue.Declare RPC call made in
> setup_queue has completed. In this method we will bind the queue
> and exchange together with the routing key by issuing the
> Queue.Bind
> RPC command. When this command is complete, the on_bindok method
> will
> be invoked by pika.
>
> :param pika.frame.Method method_frame: The Queue.DeclareOk frame
>
> """
> LOGGER.info('Binding %s to %s with %s',
> self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
> self._channel.queue_bind(self.on_bindok, self.QUEUE,
> self.EXCHANGE, self.ROUTING_KEY)
>
> def add_on_cancel_callback(self):
> """Add a callback that will be invoked if RabbitMQ cancels the
> consumer
> for some reason. If RabbitMQ does cancel the consumer,
> on_consumer_cancelled will be invoked by pika.
>
> """
> LOGGER.info('Adding consumer cancellation callback')
> self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
>
> def on_consumer_cancelled(self, method_frame):
> """Invoked by pika when RabbitMQ sends a Basic.Cancel for a
> consumer
> receiving messages.
>
> :param pika.frame.Method method_frame: The Basic.Cancel frame
>
> """
> LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
> method_frame)
> if self._channel:
> self._channel.close()
>
> def acknowledge_message(self, delivery_tag):
> """Acknowledge the message delivery from RabbitMQ by sending a
> Basic.Ack RPC method for the delivery tag.
>
> :param int delivery_tag: The delivery tag from the Basic.Deliver
> frame
>
> """
> LOGGER.info('Acknowledging message %s', delivery_tag)
> self._channel.basic_ack(delivery_tag)
>
> def on_message(self, unused_channel, basic_deliver, properties, body):
> """Invoked by pika when a message is delivered from RabbitMQ. The
> channel is passed for your convenience. The basic_deliver object
> that
> is passed in carries the exchange, routing key, delivery tag and
> a redelivered flag for the message. The properties passed in is an
> instance of BasicProperties with the message properties and the
> body
> is the message that was sent.
>
> :param pika.channel.Channel unused_channel: The channel object
> :param pika.Spec.Basic.Deliver: basic_deliver method
> :param pika.Spec.BasicProperties: properties
> :param str|unicode body: The message body
>
> """
> LOGGER.info('Received message # %s from %s: %s',
> basic_deliver.delivery_tag, properties.app_id, body)
> self.acknowledge_message(basic_deliver.delivery_tag)
>
> def on_cancelok(self, unused_frame):
> """This method is invoked by pika when RabbitMQ acknowledges the
> cancellation of a consumer. At this point we will close the
> channel.
> This will invoke the on_channel_closed method once the channel has
> been
> closed, which will in-turn close the connection.
>
> :param pika.frame.Method unused_frame: The Basic.CancelOk frame
>
> """
> LOGGER.info('RabbitMQ acknowledged the cancellation of the
> consumer')
> self.close_channel()
>
> def stop_consuming(self):
> """Tell RabbitMQ that you would like to stop consuming by sending
> the
> Basic.Cancel RPC command.
>
> """
> if self._channel:
> LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
> self._channel.basic_cancel(self.on_cancelok,
> self._consumer_tag)
>
> def start_consuming(self):
> """This method sets up the consumer by first calling
> add_on_cancel_callback so that the object is notified if RabbitMQ
> cancels the consumer. It then issues the Basic.Consume RPC command
> which returns the consumer tag that is used to uniquely identify
> the
> consumer with RabbitMQ. We keep the value to use it when we want to
> cancel consuming. The on_message method is passed in as a callback
> pika
> will invoke when a message is fully received.
>
> """
> LOGGER.info('Issuing consumer related RPC commands')
> self.add_on_cancel_callback()
> self._consumer_tag = self._channel.basic_consume(self.on_message,
> self.QUEUE)
>
> def on_bindok(self, unused_frame):
> """Invoked by pika when the Queue.Bind method has completed. At
> this
> point we will start consuming messages by calling start_consuming
> which will invoke the needed RPC commands to start the process.
>
> :param pika.frame.Method unused_frame: The Queue.BindOk response
> frame
>
> """
> LOGGER.info('Queue bound')
> self.start_consuming()
>
> def close_channel(self):
> """Call to close the channel with RabbitMQ cleanly by issuing the
> Channel.Close RPC command.
>
> """
> LOGGER.info('Closing the channel')
> self._channel.close()
>
> def open_channel(self):
> """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
> command. When RabbitMQ responds that the channel is open, the
> on_channel_open callback will be invoked by pika.
>
> """
> LOGGER.info('Creating a new channel')
> self._connection.channel(on_open_callback=self.on_channel_open)
>
> def run(self):
> """Run the example consumer by connecting to RabbitMQ and then
> starting the IOLoop to block and allow the SelectConnection to
> operate.
>
> """
> self._connection = self.connect()
> self._connection.ioloop.start()
>
> def stop(self):
> """Cleanly shutdown the connection to RabbitMQ by stopping the
> consumer
> with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
> will be invoked by pika, which will then closing the channel and
> connection. The IOLoop is started again because this method is
> invoked
> when CTRL-C is pressed raising a KeyboardInterrupt exception. This
> exception stops the IOLoop which needs to be running for pika to
> communicate with RabbitMQ. All of the commands issued prior to
> starting
> the IOLoop will be buffered but not processed.
>
> """
> LOGGER.info('Stopping')
> self._closing = True
> self.stop_consuming()
> self._connection.ioloop.start()
> LOGGER.info('Stopped')
>
>
> def main():
> logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
> example = ExampleConsumer('mqserver10', 5672, 'guest', 'guest')
> try:
> example.run()
> except KeyboardInterrupt:
> example.stop()
>
>
> if __name__ == '__main__':
> main()
>
>
> On Thu, Aug 15, 2013 at 3:25 PM, Ask Solem <ask at rabbitmq.com> wrote:
>
>>
>> On Aug 15, 2013, at 1:20 PM, Priyanki Vashi <vashi.priyanki at gmail.com>
>> wrote:
>>
>> > Hi There ,
>> >
>> > This might be very basic error but since I am trying to use Tornado
>> connection for the first time, help on this would be appreciated.
>> >
>> > I was trying to use Tornado consumer example given on following pika
>> site.
>> >
>> > https://pika.readthedocs.org/en/0.9.13/examples/tornado_consumer.html
>> >
>> > I am running into following error. The example is with URL connection
>> method.
>> > But I also tried to use hostname and port method as well as tried
>> changing to IP address instead of hostname but still it's the same error.
>> >
>> > I have pika 0.9.13 library installed. My consumer & Producer works fine
>> with select.connection method so I believe library is correctly installed.
>> >
>> > Do I need to do something additional to get tornado connection working
>> for both producer and consumer with pika?
>> >
>> > Please suggest.
>> >
>> > mq1 at mqserver1:~/Producer_Receiver/Latest$ python tornedo_c_1.py
>> > Traceback (most recent call last):
>> > File "tornedo_c_1.py", line 357, in <module>
>> > main()
>> > File "tornedo_c_1.py", line 351, in main
>> > example.run()
>> > File "tornedo_c_1.py", line 326, in run
>> > self._connection = self.connect()
>> > File "tornedo_c_1.py", line 59, in connect
>> > adapters.TornadoConnection(self.parameters,
>> self.on_connection_open, stop_ioloop_on_close=False)
>> > TypeError: 'NoneType' object is not callable
>>
>>
>> Maybe you passed None as a callback somewhere? What is the value of
>> on_connection_open for example?
>>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130815/63ba333b/attachment.htm>
More information about the rabbitmq-discuss
mailing list