Hi Everyone,<div><br></div><div>I&#39;m playing around with Rabbitmq(sorry for flooding the list lately) and am having some weird problems in my tests. �I have a multi-threaded application that I was considering using rabbitmq to manage the queue but I&#39;m getting alot of blocking between my channels. �Is there a�preferred�way to setup pools?
</div><div><br></div><div>I&#39;m a bit new to Java so I used to the pools from apache commons but when I profile the channels are all blocking each other. �To test if dedicated connections per thread worked better I made an example of it as well and it does not block when the same program has a single channel per connection. �</div>
<div><br></div><div>So my question, do channels block each other or am I doing something wrong(using java api wrong or misunderstanding connection/channels)? �Is there a more�preferred�way? Unrelated to the question, but I was also wondering is there any difference between channels and connections in terms of throughput(i.e. would there be any�benefit�of using dedicated connections ignoring the overhead in�establishing/maintaining the connection)?</div>
<div><br></div><div>Here&#39;s the test code(it simply sends to a thread a bunch of data to write the queue). If it helps I&#39;m using v2.7.1, installed from MacPorts):</div><div><br></div><div><br></div><div><div>import org.apache.commons.pool.BasePoolableObjectFactory;</div>
<div>import org.apache.commons.pool.ObjectPool;</div><div>import org.apache.commons.pool.PoolableObjectFactory;</div><div>import org.apache.commons.pool.impl.GenericObjectPool;</div><div><br></div><div>import java.io.IOException;</div>
<div>import java.util.NoSuchElementException;</div><div>import java.util.Set;</div><div>import java.util.concurrent.ExecutorService;</div><div>import java.util.concurrent.Executors;</div><div>import java.util.concurrent.LinkedBlockingDeque;</div>
<div>import java.util.concurrent.ThreadPoolExecutor;</div><div>import java.util.concurrent.TimeUnit;</div><div><br></div><div><br></div><div>import com.rabbitmq.client.ConnectionFactory;</div><div>import com.rabbitmq.client.Connection;</div>
<div>import com.rabbitmq.client.Channel;</div><div>import com.rabbitmq.client.MessageProperties;</div><div><br></div><div>public class PoolExample {</div><div>� ��</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>private static ExecutorService executor_worker;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>static {</div><div>� � � � final int numberOfThreads_ThreadPoolExecutor = 20;</div>
<div>� � � � executor_worker = Executors.newFixedThreadPool(numberOfThreads_ThreadPoolExecutor);</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><span class="Apple-tab-span" style="white-space:pre">        </span></div>
<div>� � public static void main(String[] args) throws Exception {</div><div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>System.out.println(&quot;starting..&quot;); <span class="Apple-tab-span" style="white-space:pre">        </span> � <span class="Apple-tab-span" style="white-space:pre">                </span></div>
<div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>ObjectPool&lt;Channel&gt; pool =</div><div>� � � � � � � � new GenericObjectPool&lt;Channel&gt;(</div><div>� � � � � � � � new ConnectionPoolableObjectFactory(), 50);</div>
<div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>for (int x = 0; x&lt;500000000; x++) {</div><div>� � <span class="Apple-tab-span" style="white-space:pre">                </span>executor_worker.submit(new MyRunnable(x, pool));</div>
<div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div>� � }</div><div>}</div><div><br></div><div>/*</div><div>�//this is a version that creates its own connection per channel</div><div>�class ConnectionPoolableObjectFactory</div>
<div>� � � � extends BasePoolableObjectFactory&lt;Channel&gt; {</div><div><br></div><div>� � private static final ConnectionFactory factory = newConnectionFactory();</div><div><br></div><div>� � private static ConnectionFactory newConnectionFactory() {</div>
<div>� � � � ConnectionFactory factory = new ConnectionFactory();</div><div>� � � � factory.setHost(&quot;localhost&quot;);</div><div>� � � � return factory;</div><div>� � }</div><div><br></div><div>� � @Override</div><div>
� � public Channel makeObject() throws Exception {</div><div>� � � � System.out.println(&quot;new channel&quot;);</div><div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>return factory.newConnection().createChannel();</div>
<div>� � � ��</div><div>� � }</div><div><br></div><div>� � @Override</div><div>� � public boolean validateObject(Channel channel) {</div><div>� � � � return channel.isOpen();</div><div>� � }</div><div><br></div><div>� � @Override</div>
<div>� � public void destroyObject(Channel channel) throws Exception {</div><div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>System.out.println(&quot;closing&quot;);</div><div>� � � � channel.close();</div>
<div>� � }</div><div><br></div><div>� � @Override</div><div>� � public void passivateObject(Channel channel) throws Exception {</div><div>� � � � //System.out.println(&quot;sent back to queue&quot;);</div><div>� � }</div>
<div>}</div><div>*/</div><div><br></div><div>//this version pools channels�</div><div>class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory&lt;Channel&gt; {</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>private final Connection connection;</div>
<div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>ConnectionPoolableObjectFactory() throws IOException {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>ConnectionFactory factory = new ConnectionFactory();</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>factory.setHost(&quot;localhost&quot;);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>connection = factory.newConnection();</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public Channel makeObject() throws Exception {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>return connection.createChannel();</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>public boolean validateObject(Channel channel) {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>return channel.isOpen();</div><div>
<span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public void destroyObject(Channel channel) throws Exception {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>channel.close();</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>public void passivateObject(Channel channel) throws Exception {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>//do nothing</div><div>
<span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div>}</div><div><br></div><div><br></div><div>class MyRunnable implements Runnable{ �</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>protected int x = 0;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>protected ObjectPool&lt;Channel&gt; pool;</div><div><span class="Apple-tab-span" style="white-space:pre">        </span></div><div>� � public MyRunnable(int x, ObjectPool&lt;Channel&gt; pool) {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>// TODO Auto-generated constructor stub</div><div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>this.x = x;</div><div>� � <span class="Apple-tab-span" style="white-space:pre">        </span>this.pool = pool;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public void run(){</div><div>� � � � try {</div><div>� � � � � � � � Channel channel = pool.borrowObject();</div>
<div>� � � � � � <span class="Apple-tab-span" style="white-space:pre">        </span>String message = Integer.toString(x);</div><div>� � � � � � <span class="Apple-tab-span" style="white-space:pre">        </span>channel.basicPublish( &quot;&quot;, &quot;task_queue&quot;,�</div>
<div>� � � � � � � � � � � � MessageProperties.PERSISTENT_TEXT_PLAIN,</div><div>� � � � � � � � � � � � message.getBytes());</div><div>� � � � � � <span class="Apple-tab-span" style="white-space:pre">        </span>pool.returnObject(channel);</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (NoSuchElementException e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div><div>
<span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (IllegalStateException e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div>
<div><span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (Exception e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div>
<div><span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>}�</div><div>� � }</div><div>}</div><div><br></div></div>