[rabbitmq-discuss] Reproducible error for remote client -- was Re: Distributed queue setup
Meredith Gregory
lgreg.meredith at gmail.com
Fri Jul 8 02:17:33 BST 2011
Dear Rabbitters,
It turns out our problem is even simpler. If we have a client from machine B
connect to the RabbitMQ broker on machine A and then begin sending messages,
the broker closes the connection unless we fire up a client to connect to
the broker on machine A to begin reading the messages. The number of
messages before closure depends on the size of the message. i have a trace
below. i will endeavor to get you a much smaller piece of code for repro.
We have been able to reproduce this error with virtually every java client
version from 1.4 to the current 2.5.1. All tests are conducted on Mac OS X
Snow Leopard.
Best wishes,
--greg
Welcome to Scala version 2.9.0.1 (Java HotSpot(TM) 64-Bit Server VM, Java
1.6.0_24).
Type in expressions to have them evaluated.
Type :help for more information.
scala> :load
/Users/lgm/work/src/projex/stellar/mdp4tw/SpecialK.moniker/scripts/rabbit-connectivity.scala
rabbit-connectivity.scala
Loading
/Users/lgm/work/src/projex/stellar/mdp4tw/SpecialK.moniker/scripts/rabbit-connectivity.scala...
import com.biosimilarity.lift.model.store._
import com.biosimilarity.lift.model.store.usage._
import com.biosimilarity.lift.lib._
import com.biosimilarity.lift.lib.usage._
import scala.util.continuations._
defined module RabbitMQConnectivityTest
scala> import RabbitMQConnectivityTest._
import RabbitMQConnectivityTest._
import RabbitMQConnectivityTest._
scala> import AMQPDefaults._
import AMQPDefaults._
import AMQPDefaults._
scala> import MonadicAMQPUnitTest._
import MonadicAMQPUnitTest._
import MonadicAMQPUnitTest._
scala> import scala.collection.mutable.HashMap
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashMap
scala> val fqn1 = freshQueueName
val fqn1 = freshQueueName
fqn1: String = amqp_db6b2925-23cb-46d3-af1a-8b1ffdc0639b
scala> val ( sma1, jsdnr, jsdsp, msgs ) = setupConnectionApparatus( fqn1,
"10.0.1.9", "10.0.1.5", 100, new HashMap[String,List[Msg]]() )
1.9", "10.0.1.5", 100, new HashMap[String,List[Msg]]() )
sma1:
com.biosimilarity.lift.lib.SMJATwistedPair[com.biosimilarity.lift.lib.usage.MonadicAMQPUnitTest.Msg]
= com.biosimilarity.lift.lib.SMJATwistedPair at 4c900608
jsdnr: net.liftweb.amqp.JSONAMQPSender =
net.liftweb.amqp.JSONAMQPSender at 37c1e7d
jsdsp:
com.biosimilarity.lift.lib.StdMonadicJSONAMQPDispatcher[com.biosimilarity.lift.lib.usage.MonadicAMQPUnitTest.Msg]
= com.biosimilarity.lift.lib.StdMonadicJSONAMQPDispatcher at 49b5a254
msgs:
scala.collection.immutable.Stream[com.biosimilarity.lift.lib.usage.MonadicAMQPUnitTest.Msg]
= Stream(Msg(msg0,0,true,None), ?)
scala> sma1.send( msgs( 89 ) )
sma1.send( msgs( 89 ) )
scala> sma1.send( msgs( 88 ) )
sma1.send( msgs( 88 ) )
scala> net.liftweb.amqp.JSONAMQPSender at 37c1e7d: caught
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown;
reason: Attempt to use closed channel
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown;
reason: Attempt to use closed channel
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:181)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:276)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:521)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:507)
at net.liftweb.amqp.AMQPSender.send(AMQPSender.scala:30)
at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:37)
at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:36)
at scala.actors.ReactorTask.run(ReactorTask.scala:31)
at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
at
net.liftweb.amqp.AMQPSender.scala$actors$ReplyReactor$$super$resumeReceiver(AMQPSender.scala:15)
at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:68)
at net.liftweb.amqp.AMQPSender.resumeReceiver(AMQPSender.scala:15)
at scala.actors.Actor$class.searchMailbox(Actor.scala:500)
at net.liftweb.amqp.AMQPSender.searchMailbox(AMQPSender.scala:15)
at
scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Reactor.scala:117)
at
scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
at
scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
at scala.actors.ReactorTask.run(ReactorTask.scala:33)
at
scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)
On Tue, Jul 5, 2011 at 10:10 AM, Meredith Gregory
<lgreg.meredith at gmail.com>wrote:
> Dear Rabbitters,
>
> We've got a situation -- it must be quite common -- where, in the simplest
> case, we have two application nodes, call them A and B. RabbitMQ server is
> running on the machines for both A and B. The B node has established it's
> connection to exchange and queue, but does not know whether A has. In the
> case that A has, all is well in the world. When B sends a message to A over
> RabbitMQ, A happily processes it and returns a response message. However, if
> A has not established it's connection to exchange and queue, then if B has
> sent a message to A on this exchange and queue, then when A establishes it's
> connection -- not only does it not see the pending message, the next message
> B sends causes a Java exception to be thrown (see below). Now, we can
> arrange a bit of a handshake to work around this, and have done so. However,
> we'd like to know if there's a better or Rabbitier way of doing this.
>
> Best wishes,
>
> --greg
>
> com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channelscala>
> at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:181)
> at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:276)
> at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:521)
> at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:507)
> at net.liftweb.amqp.AMQPSender.send(AMQPSender.scala:25)
> at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:32)
> at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:31)
> at scala.actors.ReactorTask.run(ReactorTask.scala:31)
> at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
> at net.liftweb.amqp.AMQPSender.scala$actors$ReplyReactor$$super$resumeReceiver(AMQPSender.scala:15)
> at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:68)
> at net.liftweb.amqp.AMQPSender.resumeReceiver(AMQPSender.scala:15)
> at scala.actors.Actor$class.searchMailbox(Actor.scala:500)
> at net.liftweb.amqp.AMQPSender.searchMailbox(AMQPSender.scala:15)
> at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Reactor.scala:117)
> at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
> at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
> at scala.actors.ReactorTask.run(ReactorTask.scala:33)
> at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
> at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)
>
>
>
> --
> L.G. Meredith
> Managing Partner
> Biosimilarity LLC
> 7329 39th Ave SW
> Seattle, WA 98136
>
> +1 206.650.3740
>
> http://biosimilarity.blogspot.com
>
>
--
L.G. Meredith
Managing Partner
Biosimilarity LLC
7329 39th Ave SW
Seattle, WA 98136
+1 206.650.3740
http://biosimilarity.blogspot.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110707/8cbc5655/attachment.htm>
More information about the rabbitmq-discuss
mailing list