[rabbitmq-discuss] tx_commit() semantics with async connections (e.g Pika SelectConnection)

Matt Pietrek mpietrek at skytap.com
Fri Aug 17 20:13:07 BST 2012


I've been drilling into an issue for a few days where my Pika channel is
abruptly closed. I'm pretty sure I've found either:

   - A large gap in my understanding
   - A misuse of the Rabbit/Pika APIs
   - A serious bug in Pika.

My original understanding about transactions is that calling tx_select() is
a synchronous operation, and that when it returns, whatever actions you've
sent to the broker are committed.

However, in stepping through the Pika tx_commit() code, it seems like it
all it does is add the Tx.Commit message to an outgoing buffer, and that
the tx_commit() call returns without anything actually sent to the broker.
(I've set breakpoints in the debugger at strategic points to verify this.)

In my message handler, I'm writing a reply to the incoming message, using
the same channel. What I'm seeing (verified via WireShark) is that the
Tx.Commit message is followed by some number of Basic.Publish,
Content-Header, and Content-Body records corresponding to my reply message
writes. This violates the AMQP standard, which says that nothing should be
sent to the broker after sending a Tx.Commit and before receiving a
Tx.Commit-Ok reply. (If it helps understanding, my incoming message queue
has a few messages in it already, so I see a series of incoming messages
immediately after starting.)

I see that the tx_commit() method takes a callback method, but if I were to
simply block in my message handler, waiting for the callback to be invoked,
I'd deadlock because Pika's buffer processing code doesn't get a chance to
run.
So given the above, is it me? Is it Pika?

My code looks like this:

def echo_on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    # Open a channel
    connection.channel(echo_on_channel_open)

# Step #3
def echo_on_channel_open(channel):
    channel.queue_declare(queue=echo_queue_name, durable=True,
arguments={"x-ha-policy": "all"})
    channel.tx_select()
    channel.basic_consume(echo_handle_delivery, queue=echo_queue_name)

def echo_handle_delivery(channel, method, header, body):
    print "Received message %s" % (body)

    # Write a reply message on the same channel
    channel.basic_publish(exchange='',
        routing_key='reply_queue',
        body="replying to %s" % (body),
        properties=BasicProperties(delivery_mode=2))

    # Ack the incoming message, then force the write to be flushed.
    channel.basic_ack(delivery_tag=method.delivery_tag)
    channel.tx_commit()

def echo_thread():
    connection_parameters = ConnectionParameters(host=broker_host)
    echo_connection = SelectConnection(connection_parameters,
echo_on_connected)
    echo_connection.ioloop.start()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120817/dc5e34c6/attachment.htm>


More information about the rabbitmq-discuss mailing list