Dear Rabbitters,<div><br></div><div>It turns out our problem is even simpler. If we have a client from machine B connect to the RabbitMQ broker on machine A and then begin sending messages, the broker closes the connection unless we fire up a client to connect to the broker on machine A to begin reading the messages. The number of messages before closure depends on the size of the message. i have a trace below. i will endeavor to get you a much smaller piece of code for repro. </div>
<div><br></div><div>We have been able to reproduce this error with virtually every java client version from 1.4 to the current 2.5.1. All tests are conducted on Mac OS X Snow Leopard.</div>
<div><br></div><div>Best wishes,</div><div><br></div><div>--greg</div><div><br></div><div><div>Welcome to Scala version 2.9.0.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_24).</div><div>Type in expressions to have them evaluated.</div>
<div>Type :help for more information.</div><div><br></div><div>scala&gt; :load /Users/lgm/work/src/projex/stellar/mdp4tw/SpecialK.moniker/scripts/rabbit-connectivity.scala</div><div>rabbit-connectivity.scala</div><div>Loading /Users/lgm/work/src/projex/stellar/mdp4tw/SpecialK.moniker/scripts/rabbit-connectivity.scala...</div>
<div>import com.biosimilarity.lift.model.store._</div><div>import com.biosimilarity.lift.model.store.usage._</div><div>import com.biosimilarity.lift.lib._</div><div>import com.biosimilarity.lift.lib.usage._</div><div>import scala.util.continuations._</div>
<div>defined module RabbitMQConnectivityTest</div><div><br></div><div>scala&gt; import RabbitMQConnectivityTest._</div><div>import RabbitMQConnectivityTest._</div><div>import RabbitMQConnectivityTest._</div><div><br></div>
<div>scala&gt; import AMQPDefaults._</div><div>import AMQPDefaults._</div><div>import AMQPDefaults._</div><div><br></div><div>scala&gt; import MonadicAMQPUnitTest._</div><div>import MonadicAMQPUnitTest._</div><div>import MonadicAMQPUnitTest._</div>
<div><br></div><div>scala&gt; import scala.collection.mutable.HashMap</div><div>import scala.collection.mutable.HashMap</div><div>import scala.collection.mutable.HashMap</div><div><br></div><div>scala&gt; val fqn1 = freshQueueName</div>
<div>val fqn1 = freshQueueName</div><div>fqn1: String = amqp_db6b2925-23cb-46d3-af1a-8b1ffdc0639b</div><div><br></div><div>scala&gt; val ( sma1, jsdnr, jsdsp, msgs ) = setupConnectionApparatus( fqn1, &quot;10.0.1.9&quot;, &quot;10.0.1.5&quot;, 100, new HashMap[String,List[Msg]]() )</div>
<div>1.9&quot;, &quot;10.0.1.5&quot;, 100, new HashMap[String,List[Msg]]() )</div><div>sma1: com.biosimilarity.lift.lib.SMJATwistedPair[com.biosimilarity.lift.lib.usage.MonadicAMQPUnitTest.Msg] = com.biosimilarity.lift.lib.SMJATwistedPair@4c900608</div>
<div>jsdnr: net.liftweb.amqp.JSONAMQPSender = net.liftweb.amqp.JSONAMQPSender@37c1e7d</div><div>jsdsp: com.biosimilarity.lift.lib.StdMonadicJSONAMQPDispatcher[com.biosimilarity.lift.lib.usage.MonadicAMQPUnitTest.Msg] = com.biosimilarity.lift.lib.StdMonadicJSONAMQPDispatcher@49b5a254</div>
<div>msgs: scala.collection.immutable.Stream[com.biosimilarity.lift.lib.usage.MonadicAMQPUnitTest.Msg] = Stream(Msg(msg0,0,true,None), ?)</div><div><br></div><div>scala&gt; sma1.send( msgs( 89 ) )</div><div>sma1.send( msgs( 89 ) )</div>
<div><br></div><div>scala&gt; sma1.send( msgs( 88 ) )</div><div>sma1.send( msgs( 88 ) )</div><div><br></div><div>scala&gt; net.liftweb.amqp.JSONAMQPSender@37c1e7d: caught com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel</div>
<div>com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:181)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:276)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:521)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:507)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at net.liftweb.amqp.AMQPSender.send(AMQPSender.scala:30)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:37)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:36)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.ReactorTask.run(ReactorTask.scala:31)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at net.liftweb.amqp.AMQPSender.scala$actors$ReplyReactor$$super$resumeReceiver(AMQPSender.scala:15)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:68)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at net.liftweb.amqp.AMQPSender.resumeReceiver(AMQPSender.scala:15)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.Actor$class.searchMailbox(Actor.scala:500)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at net.liftweb.amqp.AMQPSender.searchMailbox(AMQPSender.scala:15)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Reactor.scala:117)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.actors.ReactorTask.run(ReactorTask.scala:33)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)</div><div><br></div><br><div class="gmail_quote">On Tue, Jul 5, 2011 at 10:10 AM, Meredith Gregory <span dir="ltr">&lt;<a href="mailto:lgreg.meredith@gmail.com" target="_blank">lgreg.meredith@gmail.com</a>&gt;</span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">Dear Rabbitters,<div><br></div><div>We&#39;ve got a situation -- it must be quite common -- where, in the simplest case, we have two application nodes, call them A and B. RabbitMQ server is running on the machines for both A and B. The B node has established it&#39;s connection to exchange and queue, but does not know whether A has. In the case that A has, all is well in the world. When B sends a message to A over RabbitMQ, A happily processes it and returns a response message. However, if A has not established it&#39;s connection to exchange and queue, then if B has sent a message to A on this exchange and queue, then when A establishes it&#39;s connection -- not only does it not see the pending message, the next message B sends causes a Java exception to be thrown (see below). Now, we can arrange a bit of a handshake to work around this, and have done so. However, we&#39;d like to know if there&#39;s a better or Rabbitier way of doing this.</div>



<div><br></div><div>Best wishes,</div><div><br></div><div>--greg</div><div><br></div><div><span style="font-family:arial, sans-serif;font-size:13px"><pre style="font-size:12px;white-space:pre-wrap;max-width:80em;padding-left:0.7em">
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channelscala&gt;
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:181)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:276)
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:521)
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:507)
        at net.liftweb.amqp.AMQPSender.send(AMQPSender.scala:25)
        at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:32)
        at net.liftweb.amqp.AMQPSender$$anonfun$loop$1.apply(AMQPSender.scala:31)
        at scala.actors.ReactorTask.run(ReactorTask.scala:31)
        at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
        at net.liftweb.amqp.AMQPSender.scala$actors$ReplyReactor$$super$resumeReceiver(AMQPSender.scala:15)
        at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:68)
        at net.liftweb.amqp.AMQPSender.resumeReceiver(AMQPSender.scala:15)
        at scala.actors.Actor$class.searchMailbox(Actor.scala:500)
        at net.liftweb.amqp.AMQPSender.searchMailbox(AMQPSender.scala:15)
        at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Reactor.scala:117)
        at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
        at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
        at scala.actors.ReactorTask.run(ReactorTask.scala:33)
        at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
        at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)
</pre><div><br></div></span><br>-- <br>L.G. Meredith<br>Managing Partner<br>Biosimilarity LLC<br>7329 39th Ave SW<div>Seattle, WA 98136<br><br><a href="tel:%2B1%20206.650.3740" value="+12066503740" target="_blank">+1 206.650.3740</a><br>


<br><a href="http://biosimilarity.blogspot.com" target="_blank">http://biosimilarity.blogspot.com</a></div>
<br>
</div>
</blockquote></div><br><br clear="all"><br>-- <br>L.G. Meredith<br>Managing Partner<br>Biosimilarity LLC<br>7329 39th Ave SW<div>Seattle, WA 98136<br><br><a href="tel:%2B1%20206.650.3740" value="+12066503740" target="_blank">+1 206.650.3740</a><br>
<br><a href="http://biosimilarity.blogspot.com" target="_blank">http://biosimilarity.blogspot.com</a></div>
<br>
</div>