Hi Everyone,<div><br></div><div>I'm playing around with Rabbitmq(sorry for flooding the list lately) and am having some weird problems in my tests. I have a multi-threaded application that I was considering using rabbitmq to manage the queue but I'm getting alot of blocking between my channels. Is there a preferred way to setup pools?
</div><div><br></div><div>I'm a bit new to Java so I used to the pools from apache commons but when I profile the channels are all blocking each other. To test if dedicated connections per thread worked better I made an example of it as well and it does not block when the same program has a single channel per connection. </div>
<div><br></div><div>So my question, do channels block each other or am I doing something wrong(using java api wrong or misunderstanding connection/channels)? Is there a more preferred way? Unrelated to the question, but I was also wondering is there any difference between channels and connections in terms of throughput(i.e. would there be any benefit of using dedicated connections ignoring the overhead in establishing/maintaining the connection)?</div>
<div><br></div><div>Here's the test code(it simply sends to a thread a bunch of data to write the queue). If it helps I'm using v2.7.1, installed from MacPorts):</div><div><br></div><div><br></div><div><div>import org.apache.commons.pool.BasePoolableObjectFactory;</div>
<div>import org.apache.commons.pool.ObjectPool;</div><div>import org.apache.commons.pool.PoolableObjectFactory;</div><div>import org.apache.commons.pool.impl.GenericObjectPool;</div><div><br></div><div>import java.io.IOException;</div>
<div>import java.util.NoSuchElementException;</div><div>import java.util.Set;</div><div>import java.util.concurrent.ExecutorService;</div><div>import java.util.concurrent.Executors;</div><div>import java.util.concurrent.LinkedBlockingDeque;</div>
<div>import java.util.concurrent.ThreadPoolExecutor;</div><div>import java.util.concurrent.TimeUnit;</div><div><br></div><div><br></div><div>import com.rabbitmq.client.ConnectionFactory;</div><div>import com.rabbitmq.client.Connection;</div>
<div>import com.rabbitmq.client.Channel;</div><div>import com.rabbitmq.client.MessageProperties;</div><div><br></div><div>public class PoolExample {</div><div> </div><div><span class="Apple-tab-span" style="white-space:pre">        </span>private static ExecutorService executor_worker;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>static {</div><div> final int numberOfThreads_ThreadPoolExecutor = 20;</div>
<div> executor_worker = Executors.newFixedThreadPool(numberOfThreads_ThreadPoolExecutor);</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><span class="Apple-tab-span" style="white-space:pre">        </span></div>
<div> public static void main(String[] args) throws Exception {</div><div> <span class="Apple-tab-span" style="white-space:pre">        </span>System.out.println("starting.."); <span class="Apple-tab-span" style="white-space:pre">        </span> <span class="Apple-tab-span" style="white-space:pre">                </span></div>
<div> <span class="Apple-tab-span" style="white-space:pre">        </span>ObjectPool<Channel> pool =</div><div> new GenericObjectPool<Channel>(</div><div> new ConnectionPoolableObjectFactory(), 50);</div>
<div> <span class="Apple-tab-span" style="white-space:pre">        </span>for (int x = 0; x<500000000; x++) {</div><div> <span class="Apple-tab-span" style="white-space:pre">                </span>executor_worker.submit(new MyRunnable(x, pool));</div>
<div> <span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div> }</div><div>}</div><div><br></div><div>/*</div><div> //this is a version that creates its own connection per channel</div><div> class ConnectionPoolableObjectFactory</div>
<div> extends BasePoolableObjectFactory<Channel> {</div><div><br></div><div> private static final ConnectionFactory factory = newConnectionFactory();</div><div><br></div><div> private static ConnectionFactory newConnectionFactory() {</div>
<div> ConnectionFactory factory = new ConnectionFactory();</div><div> factory.setHost("localhost");</div><div> return factory;</div><div> }</div><div><br></div><div> @Override</div><div>
public Channel makeObject() throws Exception {</div><div> System.out.println("new channel");</div><div> <span class="Apple-tab-span" style="white-space:pre">        </span>return factory.newConnection().createChannel();</div>
<div> </div><div> }</div><div><br></div><div> @Override</div><div> public boolean validateObject(Channel channel) {</div><div> return channel.isOpen();</div><div> }</div><div><br></div><div> @Override</div>
<div> public void destroyObject(Channel channel) throws Exception {</div><div> <span class="Apple-tab-span" style="white-space:pre">        </span>System.out.println("closing");</div><div> channel.close();</div>
<div> }</div><div><br></div><div> @Override</div><div> public void passivateObject(Channel channel) throws Exception {</div><div> //System.out.println("sent back to queue");</div><div> }</div>
<div>}</div><div>*/</div><div><br></div><div>//this version pools channels </div><div>class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>private final Connection connection;</div>
<div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>ConnectionPoolableObjectFactory() throws IOException {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>ConnectionFactory factory = new ConnectionFactory();</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>factory.setHost("localhost");</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>connection = factory.newConnection();</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public Channel makeObject() throws Exception {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>return connection.createChannel();</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>public boolean validateObject(Channel channel) {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>return channel.isOpen();</div><div>
<span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public void destroyObject(Channel channel) throws Exception {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>channel.close();</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>@Override</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>public void passivateObject(Channel channel) throws Exception {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>//do nothing</div><div>
<span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div>}</div><div><br></div><div><br></div><div>class MyRunnable implements Runnable{ </div><div><span class="Apple-tab-span" style="white-space:pre">        </span>protected int x = 0;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>protected ObjectPool<Channel> pool;</div><div><span class="Apple-tab-span" style="white-space:pre">        </span></div><div> public MyRunnable(int x, ObjectPool<Channel> pool) {</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>// TODO Auto-generated constructor stub</div><div> <span class="Apple-tab-span" style="white-space:pre">        </span>this.x = x;</div><div> <span class="Apple-tab-span" style="white-space:pre">        </span>this.pool = pool;</div>
<div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public void run(){</div><div> try {</div><div> Channel channel = pool.borrowObject();</div>
<div> <span class="Apple-tab-span" style="white-space:pre">        </span>String message = Integer.toString(x);</div><div> <span class="Apple-tab-span" style="white-space:pre">        </span>channel.basicPublish( "", "task_queue", </div>
<div> MessageProperties.PERSISTENT_TEXT_PLAIN,</div><div> message.getBytes());</div><div> <span class="Apple-tab-span" style="white-space:pre">        </span>pool.returnObject(channel);</div>
<div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (NoSuchElementException e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div><div>
<span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (IllegalStateException e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div>
<div><span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (Exception e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>// TODO Auto-generated catch block</div>
<div><span class="Apple-tab-span" style="white-space:pre">                        </span>e.printStackTrace();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} </div><div> }</div><div>}</div><div><br></div></div>