[rabbitmq-discuss] Tornado Consumer example problem

Laing, Michael michael.laing at nytimes.com
Thu Aug 15 17:19:49 BST 2013


sorry that got sent a little early...

run:

from pika import adapters
if adapters.TornadoConnection:
    print 'y'
else:
    print 'n'


On Thu, Aug 15, 2013 at 12:18 PM, Laing, Michael
<michael.laing at nytimes.com>wrote:

> Is tornado installed?
>
> run this python fragment:
>
> from pika import adapters
> if adapters.TornadoConnection:
> print 'y'
>
>
>
> On Thu, Aug 15, 2013 at 11:00 AM, Priyanki Vashi <vashi.priyanki at gmail.com
> > wrote:
>
>> Hi,
>>
>> Sorry to say but I have now made connection to return the value but it
>> still say the same error.
>>
>> Does anyone have working example of tornado connector (either consumer or
>> producer) with pika client and has possibility to sharing with me ?
>> Very basic example is also OK.
>>
>> Thank you in advance !
>>
>> Best Regards,
>> Priyanki.
>>
>>
>> On Thu, Aug 15, 2013 at 4:25 PM, Laing, Michael <
>> michael.laing at nytimes.com> wrote:
>>
>>> The connect method doesn't apear to return a connection...
>>>
>>>
>>> On Thu, Aug 15, 2013 at 9:51 AM, Priyanki Vashi <
>>> vashi.priyanki at gmail.com> wrote:
>>>
>>>> Hi ,
>>>>
>>>> on_connection_open, is a callback method, which will be called when
>>>> connection is opened with rabbitmq. It opens channel inturn. So it's little
>>>> unlikely that it has 'None' value but I am not 100% sure.
>>>>
>>>> Here is my code. Could you quickly have a look.
>>>> It's the same example from pika site for tornado consumer except that I
>>>> have tried to use connection parameters instead of URL.
>>>>
>>>>
>>>> from pika import adapters
>>>> import pika
>>>> import logging
>>>>
>>>> LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
>>>>               '-35s %(lineno) -5d: %(message)s')
>>>> LOGGER = logging.getLogger(__name__)
>>>>
>>>>
>>>> class ExampleConsumer(object):
>>>>     """This is an example consumer that will handle unexpected
>>>> interactions
>>>>     with RabbitMQ such as channel and connection closures.
>>>>
>>>>     If RabbitMQ closes the connection, it will reopen it. You should
>>>>     look at the output, as there are limited reasons why the connection
>>>> may
>>>>     be closed, which usually are tied to permission related issues or
>>>>     socket timeouts.
>>>>
>>>>     If the channel is closed, it will indicate a problem with one of the
>>>>     commands that were issued and that should surface in the output as
>>>> well.
>>>>
>>>>     """
>>>>     EXCHANGE = 'message'
>>>>     EXCHANGE_TYPE = 'topic'
>>>>     QUEUE = 'text'
>>>>     ROUTING_KEY = 'example.text'
>>>>
>>>>
>>>>
>>>>
>>>>     def __init__(self, host, port, username, password):
>>>>         """Setup the example publisher object, passing in the URL we
>>>> will use
>>>>         to connect to RabbitMQ.
>>>>
>>>>         :param str amqp_url: The URL for connecting to RabbitMQ
>>>>
>>>>         """
>>>>
>>>>         self._connection = None
>>>>         self._channel = None
>>>>         self._closing = False
>>>>         self._consumer_tag = None
>>>>         #self._url = amqp_url
>>>>
>>>>         self.credentials = pika.PlainCredentials(username=username,
>>>> password=password)
>>>>         self.parameters = pika.ConnectionParameters(host=host,
>>>> port=port, credentials=self.credentials)
>>>>
>>>>
>>>>
>>>>     def connect(self):
>>>>         """This method connects to RabbitMQ, returning the connection
>>>> handle.
>>>>         When the connection is established, the on_connection_open
>>>> method
>>>>         will be invoked by pika.
>>>>
>>>>         :rtype: pika.SelectConnection
>>>>
>>>>         """
>>>>         #LOGGER.info('Connecting to %s', self._url)
>>>>         adapters.TornadoConnection(self.parameters,
>>>> on_open_callback=self.on_connection_open, stop_ioloop_on_close=False)
>>>>         #return
>>>> adapters.TornadoConnection(pika.URLParameters(self._url),self.on_connection_open)
>>>>
>>>>     def close_connection(self):
>>>>         """This method closes the connection to RabbitMQ."""
>>>>         LOGGER.info('Closing connection')
>>>>         self._connection.close()
>>>>
>>>>     def add_on_connection_close_callback(self):
>>>>         """This method adds an on close callback that will be invoked
>>>> by pika
>>>>         when RabbitMQ closes the connection to the publisher
>>>> unexpectedly.
>>>>
>>>>         """
>>>>         LOGGER.info('Adding connection close callback')
>>>>
>>>> self._connection.add_on_close_callback(self.on_connection_closed)
>>>>
>>>>     def on_connection_closed(self, connection, reply_code, reply_text):
>>>>         """This method is invoked by pika when the connection to
>>>> RabbitMQ is
>>>>         closed unexpectedly. Since it is unexpected, we will reconnect
>>>> to
>>>>         RabbitMQ if it disconnects.
>>>>
>>>>         :param pika.connection.Connection connection: The closed
>>>> connection obj
>>>>         :param int reply_code: The server provided reply_code if given
>>>>         :param str reply_text: The server provided reply_text if given
>>>>
>>>>         """
>>>>         self._channel = None
>>>>         if self._closing:
>>>>             self._connection.ioloop.stop()
>>>>         else:
>>>>             LOGGER.warning('Connection closed, reopening in 5 seconds:
>>>> (%s) %s',
>>>>                            reply_code, reply_text)
>>>>             self._connection.add_timeout(5, self.reconnect)
>>>>
>>>>     def on_connection_open(self, unused_connection):
>>>>         """This method is called by pika once the connection to
>>>> RabbitMQ has
>>>>         been established. It passes the handle to the connection object
>>>> in
>>>>         case we need it, but in this case, we'll just mark it unused.
>>>>
>>>>         :type unused_connection: pika.SelectConnection
>>>>
>>>>         """
>>>>         LOGGER.info('Connection opened')
>>>>         self.add_on_connection_close_callback()
>>>>         self.open_channel()
>>>>
>>>>     def reconnect(self):
>>>>         """Will be invoked by the IOLoop timer if the connection is
>>>>         closed. See the on_connection_closed method.
>>>>
>>>>         """
>>>>         # This is the old connection IOLoop instance, stop its ioloop
>>>>         self._connection.ioloop.stop()
>>>>
>>>>         if not self._closing:
>>>>
>>>>             # Create a new connection
>>>>             self._connection = self.connect()
>>>>
>>>>             # There is now a new connection, needs a new ioloop to run
>>>>             self._connection.ioloop.start()
>>>>
>>>>     def add_on_channel_close_callback(self):
>>>>         """This method tells pika to call the on_channel_closed method
>>>> if
>>>>         RabbitMQ unexpectedly closes the channel.
>>>>
>>>>         """
>>>>         LOGGER.info('Adding channel close callback')
>>>>         self._channel.add_on_close_callback(self.on_channel_closed)
>>>>
>>>>     def on_channel_closed(self, channel, reply_code, reply_text):
>>>>         """Invoked by pika when RabbitMQ unexpectedly closes the
>>>> channel.
>>>>         Channels are usually closed if you attempt to do something that
>>>>         violates the protocol, such as re-declare an exchange or queue
>>>> with
>>>>         different parameters. In this case, we'll close the connection
>>>>         to shutdown the object.
>>>>
>>>>         :param pika.channel.Channel: The closed channel
>>>>         :param int reply_code: The numeric reason the channel was closed
>>>>         :param str reply_text: The text reason the channel was closed
>>>>
>>>>         """
>>>>         LOGGER.warning('Channel %i was closed: (%s) %s',
>>>>                        channel, reply_code, reply_text)
>>>>         self._connection.close()
>>>>
>>>>     def on_channel_open(self, channel):
>>>>         """This method is invoked by pika when the channel has been
>>>> opened.
>>>>         The channel object is passed in so we can make use of it.
>>>>
>>>>         Since the channel is now open, we'll declare the exchange to
>>>> use.
>>>>
>>>>         :param pika.channel.Channel channel: The channel object
>>>>
>>>>         """
>>>>         LOGGER.info('Channel opened')
>>>>         self._channel = channel
>>>>         self.add_on_channel_close_callback()
>>>>         self.setup_exchange(self.EXCHANGE)
>>>>
>>>>     def setup_exchange(self, exchange_name):
>>>>         """Setup the exchange on RabbitMQ by invoking the
>>>> Exchange.Declare RPC
>>>>         command. When it is complete, the on_exchange_declareok method
>>>> will
>>>>         be invoked by pika.
>>>>
>>>>         :param str|unicode exchange_name: The name of the exchange to
>>>> declare
>>>>
>>>>         """
>>>>         LOGGER.info('Declaring exchange %s', exchange_name)
>>>>         self._channel.exchange_declare(self.on_exchange_declareok,
>>>>                                        exchange_name,
>>>>                                        self.EXCHANGE_TYPE)
>>>>
>>>>     def on_exchange_declareok(self, unused_frame):
>>>>         """Invoked by pika when RabbitMQ has finished the
>>>> Exchange.Declare RPC
>>>>         command.
>>>>
>>>>         :param pika.Frame.Method unused_frame: Exchange.DeclareOk
>>>> response frame
>>>>
>>>>         """
>>>>         LOGGER.info('Exchange declared')
>>>>         self.setup_queue(self.QUEUE)
>>>>
>>>>     def setup_queue(self, queue_name):
>>>>         """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
>>>>         command. When it is complete, the on_queue_declareok method will
>>>>         be invoked by pika.
>>>>
>>>>         :param str|unicode queue_name: The name of the queue to declare.
>>>>
>>>>         """
>>>>         LOGGER.info('Declaring queue %s', queue_name)
>>>>         self._channel.queue_declare(self.on_queue_declareok, queue_name)
>>>>
>>>>     def on_queue_declareok(self, method_frame):
>>>>         """Method invoked by pika when the Queue.Declare RPC call made
>>>> in
>>>>         setup_queue has completed. In this method we will bind the queue
>>>>         and exchange together with the routing key by issuing the
>>>> Queue.Bind
>>>>         RPC command. When this command is complete, the on_bindok
>>>> method will
>>>>         be invoked by pika.
>>>>
>>>>         :param pika.frame.Method method_frame: The Queue.DeclareOk frame
>>>>
>>>>         """
>>>>         LOGGER.info('Binding %s to %s with %s',
>>>>                     self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
>>>>         self._channel.queue_bind(self.on_bindok, self.QUEUE,
>>>>                                  self.EXCHANGE, self.ROUTING_KEY)
>>>>
>>>>     def add_on_cancel_callback(self):
>>>>         """Add a callback that will be invoked if RabbitMQ cancels the
>>>> consumer
>>>>         for some reason. If RabbitMQ does cancel the consumer,
>>>>         on_consumer_cancelled will be invoked by pika.
>>>>
>>>>         """
>>>>         LOGGER.info('Adding consumer cancellation callback')
>>>>         self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
>>>>
>>>>     def on_consumer_cancelled(self, method_frame):
>>>>         """Invoked by pika when RabbitMQ sends a Basic.Cancel for a
>>>> consumer
>>>>         receiving messages.
>>>>
>>>>         :param pika.frame.Method method_frame: The Basic.Cancel frame
>>>>
>>>>         """
>>>>         LOGGER.info('Consumer was cancelled remotely, shutting down:
>>>> %r',
>>>>                     method_frame)
>>>>         if self._channel:
>>>>             self._channel.close()
>>>>
>>>>     def acknowledge_message(self, delivery_tag):
>>>>         """Acknowledge the message delivery from RabbitMQ by sending a
>>>>         Basic.Ack RPC method for the delivery tag.
>>>>
>>>>         :param int delivery_tag: The delivery tag from the
>>>> Basic.Deliver frame
>>>>
>>>>         """
>>>>         LOGGER.info('Acknowledging message %s', delivery_tag)
>>>>         self._channel.basic_ack(delivery_tag)
>>>>
>>>>     def on_message(self, unused_channel, basic_deliver, properties,
>>>> body):
>>>>         """Invoked by pika when a message is delivered from RabbitMQ.
>>>> The
>>>>         channel is passed for your convenience. The basic_deliver
>>>> object that
>>>>         is passed in carries the exchange, routing key, delivery tag and
>>>>         a redelivered flag for the message. The properties passed in is
>>>> an
>>>>         instance of BasicProperties with the message properties and the
>>>> body
>>>>         is the message that was sent.
>>>>
>>>>         :param pika.channel.Channel unused_channel: The channel object
>>>>         :param pika.Spec.Basic.Deliver: basic_deliver method
>>>>         :param pika.Spec.BasicProperties: properties
>>>>         :param str|unicode body: The message body
>>>>
>>>>         """
>>>>         LOGGER.info('Received message # %s from %s: %s',
>>>>                     basic_deliver.delivery_tag, properties.app_id, body)
>>>>         self.acknowledge_message(basic_deliver.delivery_tag)
>>>>
>>>>     def on_cancelok(self, unused_frame):
>>>>         """This method is invoked by pika when RabbitMQ acknowledges the
>>>>         cancellation of a consumer. At this point we will close the
>>>> channel.
>>>>         This will invoke the on_channel_closed method once the channel
>>>> has been
>>>>         closed, which will in-turn close the connection.
>>>>
>>>>         :param pika.frame.Method unused_frame: The Basic.CancelOk frame
>>>>
>>>>         """
>>>>         LOGGER.info('RabbitMQ acknowledged the cancellation of the
>>>> consumer')
>>>>         self.close_channel()
>>>>
>>>>     def stop_consuming(self):
>>>>         """Tell RabbitMQ that you would like to stop consuming by
>>>> sending the
>>>>         Basic.Cancel RPC command.
>>>>
>>>>         """
>>>>         if self._channel:
>>>>             LOGGER.info('Sending a Basic.Cancel RPC command to
>>>> RabbitMQ')
>>>>             self._channel.basic_cancel(self.on_cancelok,
>>>> self._consumer_tag)
>>>>
>>>>     def start_consuming(self):
>>>>         """This method sets up the consumer by first calling
>>>>         add_on_cancel_callback so that the object is notified if
>>>> RabbitMQ
>>>>         cancels the consumer. It then issues the Basic.Consume RPC
>>>> command
>>>>         which returns the consumer tag that is used to uniquely
>>>> identify the
>>>>         consumer with RabbitMQ. We keep the value to use it when we
>>>> want to
>>>>         cancel consuming. The on_message method is passed in as a
>>>> callback pika
>>>>         will invoke when a message is fully received.
>>>>
>>>>         """
>>>>         LOGGER.info('Issuing consumer related RPC commands')
>>>>         self.add_on_cancel_callback()
>>>>         self._consumer_tag =
>>>> self._channel.basic_consume(self.on_message,
>>>>                                                          self.QUEUE)
>>>>
>>>>      def on_bindok(self, unused_frame):
>>>>         """Invoked by pika when the Queue.Bind method has completed. At
>>>> this
>>>>         point we will start consuming messages by calling
>>>> start_consuming
>>>>         which will invoke the needed RPC commands to start the process.
>>>>
>>>>         :param pika.frame.Method unused_frame: The Queue.BindOk
>>>> response frame
>>>>
>>>>         """
>>>>         LOGGER.info('Queue bound')
>>>>         self.start_consuming()
>>>>
>>>>     def close_channel(self):
>>>>         """Call to close the channel with RabbitMQ cleanly by issuing
>>>> the
>>>>         Channel.Close RPC command.
>>>>
>>>>         """
>>>>         LOGGER.info('Closing the channel')
>>>>         self._channel.close()
>>>>
>>>>     def open_channel(self):
>>>>         """Open a new channel with RabbitMQ by issuing the Channel.Open
>>>> RPC
>>>>         command. When RabbitMQ responds that the channel is open, the
>>>>         on_channel_open callback will be invoked by pika.
>>>>
>>>>         """
>>>>         LOGGER.info('Creating a new channel')
>>>>         self._connection.channel(on_open_callback=self.on_channel_open)
>>>>
>>>>     def run(self):
>>>>         """Run the example consumer by connecting to RabbitMQ and then
>>>>         starting the IOLoop to block and allow the SelectConnection to
>>>> operate.
>>>>
>>>>         """
>>>>         self._connection = self.connect()
>>>>         self._connection.ioloop.start()
>>>>
>>>>     def stop(self):
>>>>         """Cleanly shutdown the connection to RabbitMQ by stopping the
>>>> consumer
>>>>         with RabbitMQ. When RabbitMQ confirms the cancellation,
>>>> on_cancelok
>>>>         will be invoked by pika, which will then closing the channel and
>>>>         connection. The IOLoop is started again because this method is
>>>> invoked
>>>>         when CTRL-C is pressed raising a KeyboardInterrupt exception.
>>>> This
>>>>         exception stops the IOLoop which needs to be running for pika to
>>>>         communicate with RabbitMQ. All of the commands issued prior to
>>>> starting
>>>>         the IOLoop will be buffered but not processed.
>>>>
>>>>         """
>>>>         LOGGER.info('Stopping')
>>>>         self._closing = True
>>>>         self.stop_consuming()
>>>>         self._connection.ioloop.start()
>>>>         LOGGER.info('Stopped')
>>>>
>>>>
>>>> def main():
>>>>     logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
>>>>     example = ExampleConsumer('mqserver10', 5672, 'guest', 'guest')
>>>>     try:
>>>>         example.run()
>>>>     except KeyboardInterrupt:
>>>>         example.stop()
>>>>
>>>>
>>>> if __name__ == '__main__':
>>>>     main()
>>>>
>>>>
>>>> On Thu, Aug 15, 2013 at 3:25 PM, Ask Solem <ask at rabbitmq.com> wrote:
>>>>
>>>>>
>>>>> On Aug 15, 2013, at 1:20 PM, Priyanki Vashi <vashi.priyanki at gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi There ,
>>>>> >
>>>>> > This might be very basic error but since I am trying to use Tornado
>>>>> connection for the first time, help on this would be appreciated.
>>>>> >
>>>>> > I was trying to use Tornado consumer example given on following pika
>>>>> site.
>>>>> >
>>>>> >
>>>>> https://pika.readthedocs.org/en/0.9.13/examples/tornado_consumer.html
>>>>> >
>>>>> > I am running into following error. The example is with URL
>>>>> connection method.
>>>>> > But I also tried to use hostname and port method as well as tried
>>>>> changing to IP address instead of hostname but still it's the same error.
>>>>> >
>>>>> > I have pika 0.9.13 library installed. My consumer & Producer works
>>>>> fine with select.connection method so I believe library is correctly
>>>>> installed.
>>>>> >
>>>>> > Do I need to do something additional to get tornado connection
>>>>> working for both producer and consumer with pika?
>>>>> >
>>>>> > Please suggest.
>>>>> >
>>>>> > mq1 at mqserver1:~/Producer_Receiver/Latest$ python tornedo_c_1.py
>>>>> > Traceback (most recent call last):
>>>>> >   File "tornedo_c_1.py", line 357, in <module>
>>>>> >     main()
>>>>> >   File "tornedo_c_1.py", line 351, in main
>>>>> >     example.run()
>>>>> >   File "tornedo_c_1.py", line 326, in run
>>>>> >     self._connection = self.connect()
>>>>> >   File "tornedo_c_1.py", line 59, in connect
>>>>> >     adapters.TornadoConnection(self.parameters,
>>>>> self.on_connection_open, stop_ioloop_on_close=False)
>>>>> > TypeError: 'NoneType' object is not callable
>>>>>
>>>>>
>>>>> Maybe you passed None as a callback somewhere?  What is the value of
>>>>> on_connection_open for example?
>>>>>
>>>>> _______________________________________________
>>>>> rabbitmq-discuss mailing list
>>>>> rabbitmq-discuss at lists.rabbitmq.com
>>>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> rabbitmq-discuss mailing list
>>>> rabbitmq-discuss at lists.rabbitmq.com
>>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>>
>>>>
>>>
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-discuss at lists.rabbitmq.com
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>
>>>
>>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130815/d9535d2c/attachment.htm>


More information about the rabbitmq-discuss mailing list