[rabbitmq-discuss] ShutdownSignalException second 'channel.open'

Steve Powell steve at rabbitmq.com
Thu Mar 29 17:37:02 BST 2012


Yogesh,

Apologies for not getting back to you sooner about this.

You appear to be 'polling' the App1_Return queue; taking things off and
requeueing them. I presume it is not possible for two different tasks to
interfere? While you are 'polling' the queue (and will requeue the
message) might another task look and see the _Return queue is empty?
Surely not.

Using a rabbitmq queue for storing one 'event' message like this is:

a) inherently buggy (races and stale data are two bad things that can
   happen)
b) slow (polling rabbitmq queues is network inefficient).

If all you want is that the process posts a message (reply) when it
finishes, this sounds like a perfect RPC application. If the process can
guarantee to respond somehow in all cases; then there is no need to poll
on the Return queue, just have a Consumer listen to it.

You might want to look at the RpcClient/Server classes in the
com.rabbitmq.client package.

One reason to introduce an external db is that rabbitmq queues aren't
databases. However, a much lighter-weight Java solution ought to be
possible if the external system is controlled synchronously by a Java
monitor thread, or else use the return queue like a reply-to queue and
monitor it with a Consumer as I suggest above.

> Can you give me an elegant pattern for
> - Process message from Main Queue
Yes -- use a DefaultConsumer (not the QueueingConsumer) overriding
handleDelivery() to process messages received, and handleShutdown() if
you want to avoid the shutdownListener() as well.

> - Declare/Redeclare appropriate Queue
declare

> - Attach a consume on this queue doing ch.basicConsume(...) only
>  if there are no consumers already on this queueUse the exclusive flag on the:
> 
> public String basicConsume( String queue
>                           , boolean autoAck
>                           , String consumerTag
>                           , boolean noLocal
>                           , boolean exclusive
>                           , Map<String, Object> arguments
>                           , Consumer callback )
> 
> call to have the consume fail if there is already a Consumer on that
> Queue. The parameter arguments can be null; noLocal is not supported (so
> set it to false); and set consumerTag to "" if you don't want to set
> your own tag.
> 


> - When cleaning method kicks in
>  - do ch.basicCancel on all queues, wait till you get
>    handleCancelOk on all consumers
basicCancel is an operation on the Consumer (and needs a tag), not on the queue.

>  - check message count of all queues
>  - delete all queues which are empty and have 0 consumers
>    btw, I use http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
>    to get queues, consumers and messages and then fire
>    foreach(queue)
>    if(messages == 0 && consumers == 0)
>        ch.queueDelete(queue, true, true)
>    Is this call http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
>    reliable?
It is inherently unreliable to delete in this manner, because this is an
asynchronous system. Asking first and then doing always opens a window
during which the resource you queried changes. The information you get
back from an external management call is bound to be out-of-date as soon
as you get it. Better to either:

set the queue to be auto-delete (this will go when all the Consumers are
cancelled), or use

 public Queue.DeleteOk queueDelete( String queue
                                  , boolean ifUnused
                                  , boolean ifEmpty )

which allows you to only delete the queue if it is empty and/or unused.

I hope this gives you some ideas. It is hard to be more precise without
getting down to your application details.

Steve Powell  (a happy bunny)
----------some more definitions from the SPD----------
chinchilla (n.) Cooling device for the lower jaw.
socialcast (n.) Someone to whom everyone is speaking but nobody likes.
literacy (n.) A textually transmitted disease usually contracted in childhood.

On 3 Feb 2012, at 03:45, Yogesh Ketkar wrote:

> Hi Steve,
> 
> Couple of things
> 
> 1. Purpose of _Return queues
>   Say App1 Queues has 10 messages and first one is processed by
> calling an external system. This system
>   operates in asynchronous manner and just returns task-id. I put it
> on App1_Return queue and keep on
>   polling this queue, get task-id and query external system for
> status. If task is still going on, I
>   re-queue the message to App1_Return queue.
>   To process any message on App1 queue, I check if there is any
> message on App1_Return queue. If there
>   is none, only then can I process message from App1 queue as
> otherwise, it indicates that previous
>   operation is on App1 is still not complete.
>   App1_Return queue at any point of time will only have at most one
> message.
>   This probably can be done using an external db, but why introduce
> another system.
> 
> 2.
>> Just because you have thousands of queues doesn't mean you need to have
>> thousands of Consumer instances -- it is quite ok to use the same Consumer on
>> several basicConsume calls. The consumerTag is passed to every Consumer callback
>> so the Consumer code can distinguish for which basicConsume it is called.
> Can you give me an elegant pattern for
> - Process message from Main Queue
> - Declare/Redeclare appropriate Queue
> - Attach a consume on this queue doing ch.basicConsume(...) only
>  if there are no consumers already on this queue
> - When cleaning method kicks in
>  - do ch.basicCancel on all queues, wait till you get
>    handleCancelOk on all consumers
>  - check message count of all queues
>  - delete all queues which are empty and have 0 consumers
>    btw, I use http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
>    to get queues, consumers and messages and then fire
>    foreach(queue)
>    if(messages == 0 && consumers == 0)
>        ch.queueDelete(queue, true, true)
>    Is this call http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
>    reliable?
> 
> regards, Yogesh
> 
> On Feb 1, 3:35 pm, Steve Powell <st... at rabbitmq.com> wrote:
>> Yogesh,
>> 
>> Thanks for your explanation of your application structure, and the version you
>> are running.
>> 
>> I do not know how the second channel.open AMQP command (for the same channel)
>> was sent to the broker; can you send some more diagnostics of this failure (log,
>> full stack trace) so I can raise a bug. If possible, a demonstrating small
>> program. Thanks.
>> 
>> I see your difficulty. I would hope that creating 20000 queues (which are empty
>> almost all of the time) would not take up too much room, so I think you should
>> consider not deleting them at all. This would solve most of your problem.
>> 
>> The x-expires attribute for queues, really ought to have a 'only-if-empty'
>> option. I'll raise a bug (24722) for this, and see how difficult it would be.
>> 
>> I do not understand what the _Return queues are for. They look as though you are
>> using them as a private persistent store for each App. It would be better if you
>> put this information somewhere else, but heigh-ho -- you can safely delete these
>> queues since this app is in complete control of them, and the events are
>> processed serially.
>> 
>> One of the reasons, I don't do basicConsume on queues is, I am going to have
>> thousands of queues. I rather thought it would be easier to have a thread pool,
>> each consuming just one message from a queue in round-robin fashion.
>> 
>> Just because you have thousands of queues doesn't mean you need to have
>> thousands of Consumer instances -- it is quite ok to use the same Consumer on
>> several basicConsume calls. The consumerTag is passed to every Consumer callback
>> so the Consumer code can distinguish for which basicConsume it is called.
>> 
>> By default, the Java client will allocate a pool of 5 threads for each
>> connection, and dispatch Consumer callbacks onto one of those threads for you.
>> So your thread pool might be unnecessary if you used basicConsume(). I'd
>> consider doing that since basicGet() can be slow, and you'd have to get from
>> each potential queue just to tell if there is a message for it!
>> 
>> Steve Powell  (a happy bunny)
>> ----------some more definitions from the SPD----------
>> vermin (v.) Treating the dachshund for roundworm.
>> chinchilla (n.) Cooling device for the lower jaw.
>> socialcast (n.) Someone to whom everyone is speaking but nobody likes.
>> 
>> On 31 Jan 2012, at 13:56, Yogesh Ketkar wrote:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>> I am using RabbitMQ server version 2.7.1.
>>> Java Client Jar used is
>>> rabbitmq-java-client-bin-2.7.1/rabbitmq-client.jar
>> 
>>>> The Shutdown seems to be being called because the channel is being opened twice.
>>>> The broker complains about this and closes the connection. Are you creating
>>>> channels on different threads simultaneously?
>>> Yes indeed, I am creating channels on different threads.
>> 
>>>> I don't think the basicPublish will fail if the queue doesn't exist. Why would
>>>> you create a new channel in this case?
>>> Yes, you are right. I will basically lose the message in this case.
>> 
>>> Now about overall problem statement.
>>> My application has a main queue which looks like
>>> MainQueue
>>>  - App1-Event1
>>>  - App2-Event1
>>>  - App1-Event2
>>>  - App1-Event3
>>>  - App3-Event1
>>>  - App3-Event2
>>>  - App2-Event2
>> 
>>> Basically there are going to be events from different Apps (there can
>>> be thousands of apps) and events belonging to an App must
>>> be processed sequentially. Events across different apps can and should
>>> be be processed in parallel.
>>> So I have only one consumer on MainQueue (using basicConsume) which
>>> reads events from MainQueue and just moves it to appropriate declared/
>>> redeclared queue.
>>> So this is how new queue structure would look like.
>> 
>>> App1
>>>  - App1-Event1
>>>  - App1-Event2
>>>  - App1-Event3
>> 
>>> App2
>>>  - App2-Event1
>>>  - App2-Event2
>> 
>>> App3
>>>  - App3-Event1
>>>  - App3-Event2
>> 
>>> Now again when Event1 is processed from Queue App1, Event2 of App1
>>> can't be processed unless processing of Event1 is complete.
>>> Processing of event involves asynchronous communication with external
>>> systems, so once Event1 is fetched (and acknowledged) from queue
>>> App1,
>>> I create another queue like
>>> App1_Return
>>>  - App1-Event1-TaskId
>> 
>>> I need to query external system using TaskId after certain time
>>> interval, to check status of event processing of Event1. Once I get
>>> the status (either sucess or failure)
>>> I discard App1-Event1-TaskId and ready to process App1-Event2. So all
>>> _Return queues will only have one event at any point of time.
>> 
>>> An event on an app might even occur once a day. So I don't want to
>>> keep so many queues (potentially 20000 if there are 10000 apps)
>>> hanging around.
>>> Both auto_delete and x-expires are not very useful as in both the
>>> schemes, queues get deleted even when they have messages.
>>> Ideally whenever last message from any Queue (except MainQueue) is
>>> consumed, I want to delete that queue. Of course, one has to make sure
>>> while a queue is getting deleted, there might be an event destined for
>>> that. So if one guy is doing
>>> queueDelete('somequeue', true, true) and other guy is doing
>>> queueDeclare, queueBind, basicPublish. If queueDelete gets executed
>>> after
>>> queueBind, message will be lost.
>> 
>>> One of the reasons, I don't do basicConsume on queues is, I am going
>>> to have thousands of queues. I rather thought it would be easier to
>>> have a thread pool, each consuming just one message from a queue in
>>> round-robin fashion.
>> 
>>> As was mentioned in some other response, I will certainly not create a
>>> new channel in every thread, but would rather try and reuse them.
>> 
>>> Thanks for all the help.
>>> Regards, Yogesh
>> 
>>> On Jan 31, 5:19 pm, Steve Powell <st... at rabbitmq.com> wrote:
>>>> Yogesh,
>> 
>>>> Please can you provide some information about your environment? And your
>>>> application? What version of RabbitMQ (and client) are you using?
>> 
>>>> In your stack trace the ShutdownListener you registered is apparently being
>>>> called, because the Connection is being shut down. It is not clear why this
>>>> exception (and its associated stack trace) appears, it seems to come from your
>>>> Listener code, but perhaps that does nothing.
>> 
>>>> The Shutdown seems to be being called because the channel is being opened twice.
>>>> The broker complains about this and closes the connection. Are you creating
>>>> channels on different threads simultaneously? (Looking at your app 'design' you
>>>> might be.) Depending upon the version of RabbitMQ this might cause a problem.
>> 
>>>> I'm afraid your application design is unclear:
>> 
>>>>> This is how I handle doing basicPublish and basicGet on potentially
>>>>> non-existent queues
>>>>> - publish involves 3 steps
>>>>>  queueDeclare
>>>>>  queueBind
>>>>>  basicPublish
>>>>>  If some other thread deletes the queue after either queueDeclare or
>>>>> queueBind, basicPublish fails and I again create a new
>>>>>  channel and do these operations
>> 
>>>> I don't think the basicPublish will fail if the queue doesn't exist. Why would
>>>> you create a new channel in this case?
>> 
>>>> Please explain why you expect the queue might be deleted by some other thread.
>> 
>>>>> - if basicGet fails, I simply ignore it
>> 
>>>> What do you mean by ignoring it? Do you poll the queue periodically? Why aren't
>>>> you using basicConsume and a Consumer to get messages (which will be notified if
>>>> the queue is deleted)?
>> 
>>>> Steve Powell  (a loopy bunny)
>>>> ----------some more definitions from the SPD----------
>>>> vermin (v.) Treating the dachshund for roundworm.
>>>> chinchilla (n.) Cooling device for the lower jaw.
>>>> socialcast (n.) Someone to whom everyone is speaking but nobody likes.
>> 
>>>> On 30 Jan 2012, at 04:33, Yogesh Ketkar wrote:
>> 
>>>>> Only operations I ever do with com.rabbitmq.client.Connection in the
>>>>> code are
>>>>>    c.addShutdownListener
>>>>>    c.createChannel
>> 
>>>>> What does this error signify?
>> 
>>>>> 2012-01-30 09:44:45,158 ERROR  [ConnectionShutdownHandler]
>>>>> ShutdownListener
>>>>> com.rabbitmq.client.ShutdownSignalException: connection error; reason:
>>>>> {#method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID
>>>>> - second 'channel.open' seen, class-id=20, method-id=10), null,
>>>>> "[B at 105691e"}
>>>>>    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:
>>>>> 641)
>>>>>    at
>>>>> com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection. java:
>>>>> 599)
>>>>>    at
>>>>> com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection. java:
>>>>> 571)
>>>>>    at com.rabbitmq.client.impl.AMQConnection
>>>>> $1.processAsync(AMQConnection.java:88)
>>>>>    at
>>>>> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel .java:
>>>>> 144)
>>>>>    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:
>>>>> 91)
>>>>>    at com.rabbitmq.client.impl.AMQConnection
>>>>> $MainLoop.run(AMQConnection.java:500)
>> 
>>>>> Some additional info.
>>>>> I create and close thousands of channels in the code. But at any point
>>>>> of time there are not more than 20/21 channels open.
>>>>> This is how I handle doing basicPublish and basicGet on potentially
>>>>> non-existent queues
>>>>> - publish involves 3 steps
>>>>>  queueDeclare
>>>>>  queueBind
>>>>>  basicPublish
>>>>>  If some other thread deletes the queue after either queueDeclare or
>>>>> queueBind, basicPublish fails and I again create a new
>>>>>  channel and do these operations
>>>>> - if basicGet fails, I simply ignore it
>> 
>>>>> regards, Yogesh
>>>>> _______________________________________________
>>>>> rabbitmq-discuss mailing list
>>>>> rabbitmq-disc... at lists.rabbitmq.com
>>>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>> 
>>>> _______________________________________________
>>>> rabbitmq-discuss mailing list
>>>> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-disc... at lists.rabbitmq.com
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>> 
>> _______________________________________________
>> rabbitmq-discuss mailing...
>> 
>> read more »



More information about the rabbitmq-discuss mailing list