[rabbitmq-discuss] ShutdownSignalException second 'channel.open'
Steve Powell
steve at rabbitmq.com
Thu Mar 29 17:37:02 BST 2012
Yogesh,
Apologies for not getting back to you sooner about this.
You appear to be 'polling' the App1_Return queue; taking things off and
requeueing them. I presume it is not possible for two different tasks to
interfere? While you are 'polling' the queue (and will requeue the
message) might another task look and see the _Return queue is empty?
Surely not.
Using a rabbitmq queue for storing one 'event' message like this is:
a) inherently buggy (races and stale data are two bad things that can
happen)
b) slow (polling rabbitmq queues is network inefficient).
If all you want is that the process posts a message (reply) when it
finishes, this sounds like a perfect RPC application. If the process can
guarantee to respond somehow in all cases; then there is no need to poll
on the Return queue, just have a Consumer listen to it.
You might want to look at the RpcClient/Server classes in the
com.rabbitmq.client package.
One reason to introduce an external db is that rabbitmq queues aren't
databases. However, a much lighter-weight Java solution ought to be
possible if the external system is controlled synchronously by a Java
monitor thread, or else use the return queue like a reply-to queue and
monitor it with a Consumer as I suggest above.
> Can you give me an elegant pattern for
> - Process message from Main Queue
Yes -- use a DefaultConsumer (not the QueueingConsumer) overriding
handleDelivery() to process messages received, and handleShutdown() if
you want to avoid the shutdownListener() as well.
> - Declare/Redeclare appropriate Queue
declare
> - Attach a consume on this queue doing ch.basicConsume(...) only
> if there are no consumers already on this queueUse the exclusive flag on the:
>
> public String basicConsume( String queue
> , boolean autoAck
> , String consumerTag
> , boolean noLocal
> , boolean exclusive
> , Map<String, Object> arguments
> , Consumer callback )
>
> call to have the consume fail if there is already a Consumer on that
> Queue. The parameter arguments can be null; noLocal is not supported (so
> set it to false); and set consumerTag to "" if you don't want to set
> your own tag.
>
> - When cleaning method kicks in
> - do ch.basicCancel on all queues, wait till you get
> handleCancelOk on all consumers
basicCancel is an operation on the Consumer (and needs a tag), not on the queue.
> - check message count of all queues
> - delete all queues which are empty and have 0 consumers
> btw, I use http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
> to get queues, consumers and messages and then fire
> foreach(queue)
> if(messages == 0 && consumers == 0)
> ch.queueDelete(queue, true, true)
> Is this call http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
> reliable?
It is inherently unreliable to delete in this manner, because this is an
asynchronous system. Asking first and then doing always opens a window
during which the resource you queried changes. The information you get
back from an external management call is bound to be out-of-date as soon
as you get it. Better to either:
set the queue to be auto-delete (this will go when all the Consumers are
cancelled), or use
public Queue.DeleteOk queueDelete( String queue
, boolean ifUnused
, boolean ifEmpty )
which allows you to only delete the queue if it is empty and/or unused.
I hope this gives you some ideas. It is hard to be more precise without
getting down to your application details.
Steve Powell (a happy bunny)
----------some more definitions from the SPD----------
chinchilla (n.) Cooling device for the lower jaw.
socialcast (n.) Someone to whom everyone is speaking but nobody likes.
literacy (n.) A textually transmitted disease usually contracted in childhood.
On 3 Feb 2012, at 03:45, Yogesh Ketkar wrote:
> Hi Steve,
>
> Couple of things
>
> 1. Purpose of _Return queues
> Say App1 Queues has 10 messages and first one is processed by
> calling an external system. This system
> operates in asynchronous manner and just returns task-id. I put it
> on App1_Return queue and keep on
> polling this queue, get task-id and query external system for
> status. If task is still going on, I
> re-queue the message to App1_Return queue.
> To process any message on App1 queue, I check if there is any
> message on App1_Return queue. If there
> is none, only then can I process message from App1 queue as
> otherwise, it indicates that previous
> operation is on App1 is still not complete.
> App1_Return queue at any point of time will only have at most one
> message.
> This probably can be done using an external db, but why introduce
> another system.
>
> 2.
>> Just because you have thousands of queues doesn't mean you need to have
>> thousands of Consumer instances -- it is quite ok to use the same Consumer on
>> several basicConsume calls. The consumerTag is passed to every Consumer callback
>> so the Consumer code can distinguish for which basicConsume it is called.
> Can you give me an elegant pattern for
> - Process message from Main Queue
> - Declare/Redeclare appropriate Queue
> - Attach a consume on this queue doing ch.basicConsume(...) only
> if there are no consumers already on this queue
> - When cleaning method kicks in
> - do ch.basicCancel on all queues, wait till you get
> handleCancelOk on all consumers
> - check message count of all queues
> - delete all queues which are empty and have 0 consumers
> btw, I use http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
> to get queues, consumers and messages and then fire
> foreach(queue)
> if(messages == 0 && consumers == 0)
> ch.queueDelete(queue, true, true)
> Is this call http://localhost:55672/api/queues/%2f?columns=name,messages,consumers
> reliable?
>
> regards, Yogesh
>
> On Feb 1, 3:35 pm, Steve Powell <st... at rabbitmq.com> wrote:
>> Yogesh,
>>
>> Thanks for your explanation of your application structure, and the version you
>> are running.
>>
>> I do not know how the second channel.open AMQP command (for the same channel)
>> was sent to the broker; can you send some more diagnostics of this failure (log,
>> full stack trace) so I can raise a bug. If possible, a demonstrating small
>> program. Thanks.
>>
>> I see your difficulty. I would hope that creating 20000 queues (which are empty
>> almost all of the time) would not take up too much room, so I think you should
>> consider not deleting them at all. This would solve most of your problem.
>>
>> The x-expires attribute for queues, really ought to have a 'only-if-empty'
>> option. I'll raise a bug (24722) for this, and see how difficult it would be.
>>
>> I do not understand what the _Return queues are for. They look as though you are
>> using them as a private persistent store for each App. It would be better if you
>> put this information somewhere else, but heigh-ho -- you can safely delete these
>> queues since this app is in complete control of them, and the events are
>> processed serially.
>>
>> One of the reasons, I don't do basicConsume on queues is, I am going to have
>> thousands of queues. I rather thought it would be easier to have a thread pool,
>> each consuming just one message from a queue in round-robin fashion.
>>
>> Just because you have thousands of queues doesn't mean you need to have
>> thousands of Consumer instances -- it is quite ok to use the same Consumer on
>> several basicConsume calls. The consumerTag is passed to every Consumer callback
>> so the Consumer code can distinguish for which basicConsume it is called.
>>
>> By default, the Java client will allocate a pool of 5 threads for each
>> connection, and dispatch Consumer callbacks onto one of those threads for you.
>> So your thread pool might be unnecessary if you used basicConsume(). I'd
>> consider doing that since basicGet() can be slow, and you'd have to get from
>> each potential queue just to tell if there is a message for it!
>>
>> Steve Powell (a happy bunny)
>> ----------some more definitions from the SPD----------
>> vermin (v.) Treating the dachshund for roundworm.
>> chinchilla (n.) Cooling device for the lower jaw.
>> socialcast (n.) Someone to whom everyone is speaking but nobody likes.
>>
>> On 31 Jan 2012, at 13:56, Yogesh Ketkar wrote:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>> I am using RabbitMQ server version 2.7.1.
>>> Java Client Jar used is
>>> rabbitmq-java-client-bin-2.7.1/rabbitmq-client.jar
>>
>>>> The Shutdown seems to be being called because the channel is being opened twice.
>>>> The broker complains about this and closes the connection. Are you creating
>>>> channels on different threads simultaneously?
>>> Yes indeed, I am creating channels on different threads.
>>
>>>> I don't think the basicPublish will fail if the queue doesn't exist. Why would
>>>> you create a new channel in this case?
>>> Yes, you are right. I will basically lose the message in this case.
>>
>>> Now about overall problem statement.
>>> My application has a main queue which looks like
>>> MainQueue
>>> - App1-Event1
>>> - App2-Event1
>>> - App1-Event2
>>> - App1-Event3
>>> - App3-Event1
>>> - App3-Event2
>>> - App2-Event2
>>
>>> Basically there are going to be events from different Apps (there can
>>> be thousands of apps) and events belonging to an App must
>>> be processed sequentially. Events across different apps can and should
>>> be be processed in parallel.
>>> So I have only one consumer on MainQueue (using basicConsume) which
>>> reads events from MainQueue and just moves it to appropriate declared/
>>> redeclared queue.
>>> So this is how new queue structure would look like.
>>
>>> App1
>>> - App1-Event1
>>> - App1-Event2
>>> - App1-Event3
>>
>>> App2
>>> - App2-Event1
>>> - App2-Event2
>>
>>> App3
>>> - App3-Event1
>>> - App3-Event2
>>
>>> Now again when Event1 is processed from Queue App1, Event2 of App1
>>> can't be processed unless processing of Event1 is complete.
>>> Processing of event involves asynchronous communication with external
>>> systems, so once Event1 is fetched (and acknowledged) from queue
>>> App1,
>>> I create another queue like
>>> App1_Return
>>> - App1-Event1-TaskId
>>
>>> I need to query external system using TaskId after certain time
>>> interval, to check status of event processing of Event1. Once I get
>>> the status (either sucess or failure)
>>> I discard App1-Event1-TaskId and ready to process App1-Event2. So all
>>> _Return queues will only have one event at any point of time.
>>
>>> An event on an app might even occur once a day. So I don't want to
>>> keep so many queues (potentially 20000 if there are 10000 apps)
>>> hanging around.
>>> Both auto_delete and x-expires are not very useful as in both the
>>> schemes, queues get deleted even when they have messages.
>>> Ideally whenever last message from any Queue (except MainQueue) is
>>> consumed, I want to delete that queue. Of course, one has to make sure
>>> while a queue is getting deleted, there might be an event destined for
>>> that. So if one guy is doing
>>> queueDelete('somequeue', true, true) and other guy is doing
>>> queueDeclare, queueBind, basicPublish. If queueDelete gets executed
>>> after
>>> queueBind, message will be lost.
>>
>>> One of the reasons, I don't do basicConsume on queues is, I am going
>>> to have thousands of queues. I rather thought it would be easier to
>>> have a thread pool, each consuming just one message from a queue in
>>> round-robin fashion.
>>
>>> As was mentioned in some other response, I will certainly not create a
>>> new channel in every thread, but would rather try and reuse them.
>>
>>> Thanks for all the help.
>>> Regards, Yogesh
>>
>>> On Jan 31, 5:19 pm, Steve Powell <st... at rabbitmq.com> wrote:
>>>> Yogesh,
>>
>>>> Please can you provide some information about your environment? And your
>>>> application? What version of RabbitMQ (and client) are you using?
>>
>>>> In your stack trace the ShutdownListener you registered is apparently being
>>>> called, because the Connection is being shut down. It is not clear why this
>>>> exception (and its associated stack trace) appears, it seems to come from your
>>>> Listener code, but perhaps that does nothing.
>>
>>>> The Shutdown seems to be being called because the channel is being opened twice.
>>>> The broker complains about this and closes the connection. Are you creating
>>>> channels on different threads simultaneously? (Looking at your app 'design' you
>>>> might be.) Depending upon the version of RabbitMQ this might cause a problem.
>>
>>>> I'm afraid your application design is unclear:
>>
>>>>> This is how I handle doing basicPublish and basicGet on potentially
>>>>> non-existent queues
>>>>> - publish involves 3 steps
>>>>> queueDeclare
>>>>> queueBind
>>>>> basicPublish
>>>>> If some other thread deletes the queue after either queueDeclare or
>>>>> queueBind, basicPublish fails and I again create a new
>>>>> channel and do these operations
>>
>>>> I don't think the basicPublish will fail if the queue doesn't exist. Why would
>>>> you create a new channel in this case?
>>
>>>> Please explain why you expect the queue might be deleted by some other thread.
>>
>>>>> - if basicGet fails, I simply ignore it
>>
>>>> What do you mean by ignoring it? Do you poll the queue periodically? Why aren't
>>>> you using basicConsume and a Consumer to get messages (which will be notified if
>>>> the queue is deleted)?
>>
>>>> Steve Powell (a loopy bunny)
>>>> ----------some more definitions from the SPD----------
>>>> vermin (v.) Treating the dachshund for roundworm.
>>>> chinchilla (n.) Cooling device for the lower jaw.
>>>> socialcast (n.) Someone to whom everyone is speaking but nobody likes.
>>
>>>> On 30 Jan 2012, at 04:33, Yogesh Ketkar wrote:
>>
>>>>> Only operations I ever do with com.rabbitmq.client.Connection in the
>>>>> code are
>>>>> c.addShutdownListener
>>>>> c.createChannel
>>
>>>>> What does this error signify?
>>
>>>>> 2012-01-30 09:44:45,158 ERROR [ConnectionShutdownHandler]
>>>>> ShutdownListener
>>>>> com.rabbitmq.client.ShutdownSignalException: connection error; reason:
>>>>> {#method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID
>>>>> - second 'channel.open' seen, class-id=20, method-id=10), null,
>>>>> "[B at 105691e"}
>>>>> at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:
>>>>> 641)
>>>>> at
>>>>> com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection. java:
>>>>> 599)
>>>>> at
>>>>> com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection. java:
>>>>> 571)
>>>>> at com.rabbitmq.client.impl.AMQConnection
>>>>> $1.processAsync(AMQConnection.java:88)
>>>>> at
>>>>> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel .java:
>>>>> 144)
>>>>> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:
>>>>> 91)
>>>>> at com.rabbitmq.client.impl.AMQConnection
>>>>> $MainLoop.run(AMQConnection.java:500)
>>
>>>>> Some additional info.
>>>>> I create and close thousands of channels in the code. But at any point
>>>>> of time there are not more than 20/21 channels open.
>>>>> This is how I handle doing basicPublish and basicGet on potentially
>>>>> non-existent queues
>>>>> - publish involves 3 steps
>>>>> queueDeclare
>>>>> queueBind
>>>>> basicPublish
>>>>> If some other thread deletes the queue after either queueDeclare or
>>>>> queueBind, basicPublish fails and I again create a new
>>>>> channel and do these operations
>>>>> - if basicGet fails, I simply ignore it
>>
>>>>> regards, Yogesh
>>>>> _______________________________________________
>>>>> rabbitmq-discuss mailing list
>>>>> rabbitmq-disc... at lists.rabbitmq.com
>>>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>>>> _______________________________________________
>>>> rabbitmq-discuss mailing list
>>>> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-disc... at lists.rabbitmq.com
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>> _______________________________________________
>> rabbitmq-discuss mailing...
>>
>> read more »
More information about the rabbitmq-discuss
mailing list