Hi Everyone,<div><br></div><div>I&#39;m playing around with Rabbitmq(sorry for flooding the list lately) and am having some weird problems in my tests.  I have a multi-threaded application that I was considering using rabbitmq to manage the queue but I&#39;m getting alot of blocking between my channels.  Is there a preferred way to setup pools?
</div><div><br></div><div>I&#39;m a bit new to Java so I used to the pools from apache commons but when I profile the channels are all blocking each other.  To test if dedicated connections per thread worked better I made an example of it as well and it does not block when the same program has a single channel per connection.  </div>
<div><br></div><div>So my question, do channels block each other or am I doing something wrong(using java api wrong or misunderstanding connection/channels)?  Is there a more preferred way? Unrelated to the question, but I was also wondering is there any difference between channels and connections in terms of throughput(i.e. would there be any benefit of using dedicated connections ignoring the overhead in establishing/maintaining the connection)?</div>
<div><br></div><div>Here&#39;s the test code(it simply sends to a thread a bunch of data to write the queue). If it helps I&#39;m using v2.7.1, installed from MacPorts):</div><div><br></div><div><br></div><div><div>import org.apache.commons.pool.BasePoolableObjectFactory;</div>
<div>import org.apache.commons.pool.ObjectPool;</div><div>import org.apache.commons.pool.PoolableObjectFactory;</div><div>import org.apache.commons.pool.impl.GenericObjectPool;</div><div><br></div><div>import java.io.IOException;</div>
<div>import java.util.NoSuchElementException;</div><div>import java.util.Set;</div><div>import java.util.concurrent.ExecutorService;</div><div>import java.util.concurrent.Executors;</div><div>import java.util.concurrent.LinkedBlockingDeque;</div>
<div>import java.util.concurrent.ThreadPoolExecutor;</div><div>import java.util.concurrent.TimeUnit;</div><div><br></div><div><br></div><div>import com.rabbitmq.client.ConnectionFactory;</div><div>import com.rabbitmq.client.Connection;</div>
<div>import com.rabbitmq.client.Channel;</div><div>import com.rabbitmq.client.MessageProperties;</div><div><br></div><div>public class PoolExample {</div><div>    </div><div><span class="Apple-tab-span" style="white-space:pre">        </span>private static ExecutorService executor_worker;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>static {</div><div>        final int numberOfThreads_ThreadPoolExecutor = 20;</div>
<div>        executor_worker = Executors.newFixedThreadPool(numberOfThreads_ThreadPoolExecutor);</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><span class="Apple-tab-span" style="white-space:pre">        </span></div>
<div>    public static void main(String[] args) throws Exception {</div><div>    <span class="Apple-tab-span" style="white-space:pre">        </span>System.out.println(&quot;starting..&quot;); <span class="Apple-tab-span" style="white-space:pre">        </span>   <span class="Apple-tab-span" style="white-space:pre">                </span></div>
<div>    <span class="Apple-tab-span" style="white-space:pre">        </span>ObjectPool&lt;Channel&gt; pool =</div><div>                new GenericObjectPool&lt;Channel&gt;(</div><div>                new ConnectionPoolableObjectFactory(), 50);</div>
<div>    <span class="Apple-tab-span" style="white-space:pre">        </span>for (int x = 0; x&lt;500000000; x++) {</div><div>    <span class="Apple-tab-span" style="white-space:pre">                </span>executor_worker.submit(new MyRunnable(x, pool));</div>
<div>    <span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div>    }</div><div>}</div><div><br></div><div>/*</div><div> //this is a version that creates its own connection per channel</div><div> class ConnectionPoolableObjectFactory</div>
<div>        extends BasePoolableObjectFactory&lt;Channel&gt; {</div><div><br></div><div>    private static final ConnectionFactory factory = newConnectionFactory();</div><div><br></div><div>    private static ConnectionFactory newConnectionFactory() {</div>
<div>        ConnectionFactory factory = new ConnectionFactory();</div><div>        factory.setHost(&quot;localhost&quot;);</div><div>        return factory;</div><div>    }</div><div><br></div><div>    @Override</div><div>
    public Channel makeObject() throws Exception {</div><div>        System.out.println(&quot;new channel&quot;);</div><div>    <span class="Apple-tab-span" style="white-space:pre">        </span>return factory.newConnection().createChannel();</div>
<div>        </div><div>    }</div><div><br></div><div>    @Override</div><div>    public boolean validateObject(Channel channel) {</div><div>        return channel.isOpen();</div><div>    }</div><div><br></div><div>    @Override</div>
<div>    public void destroyObject(Channel channel) throws Exception {</div><div>    <span class="Apple-tab-span" style="white-space:pre">        </span>System.out.println(&quot;closing&quot;);</div><div>        channel.close();</div>
<div>    }</div><div><br></div><div>    @Override</div><div>    public void passivateObject(Channel channel) throws Exception {</div><div>        //System.out.println(&quot;sent back to queue&quot;);</div><div>    }</div>
<div>}</div><div>*/</div><div><br></div><div>//this version pools channels </div><div>class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory&lt;Channel&gt; {</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>private final Connection connection;</div>
<div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>ConnectionPoolableObjectFactory() throws IOException {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>ConnectionFactory factory = new ConnectionFactory();</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>factory.setHost(&quot;localhost&quot;);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>connection = factory.newConnection();</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public Channel makeObject() throws Exception {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>return connection.createChannel();</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>public boolean validateObject(Channel channel) {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>return channel.isOpen();</div><div>
<span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public void destroyObject(Channel channel) throws Exception {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>channel.close();</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>public void passivateObject(Channel channel) throws Exception {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>//do nothing</div><div>
<span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div>}</div><div><br></div><div><br></div><div>class MyRunnable implements Runnable{  </div><div><span class="Apple-tab-span" style="white-space:pre">        </span>protected int x = 0;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>protected ObjectPool&lt;Channel&gt; pool;</div><div><span class="Apple-tab-span" style="white-space:pre">        </span></div><div>    public MyRunnable(int x, ObjectPool&lt;Channel&gt; pool) {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>// TODO Auto-generated constructor stub</div><div>    <span class="Apple-tab-span" style="white-space:pre">        </span>this.x = x;</div><div>    <span class="Apple-tab-span" style="white-space:pre">        </span>this.pool = pool;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public void run(){</div><div>        try {</div><div>                Channel channel = pool.borrowObject();</div>
<div>            <span class="Apple-tab-span" style="white-space:pre">        </span>String message = Integer.toString(x);</div><div>            <span class="Apple-tab-span" style="white-space:pre">        </span>channel.basicPublish( &quot;&quot;, &quot;task_queue&quot;, </div>
<div>                        MessageProperties.PERSISTENT_TEXT_PLAIN,</div><div>                        message.getBytes());</div><div>            <span class="Apple-tab-span" style="white-space:pre">        </span>pool.returnObject(channel);</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (NoSuchElementException e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div><div>
<span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (IllegalStateException e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div>
<div><span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (Exception e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div>
<div><span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} </div><div>    }</div><div>}</div><div><br></div></div>