[rabbitmq-discuss] please help understanding why my pool of channels are blocking.

Steve Powell steve at rabbitmq.com
Fri May 4 12:01:15 BST 2012


Hi Guatam,

At first sight, this looks like it should work -- your (large) number of
tasks, will get gated through the 20 threads, which will use the 50
channels. It ought to be the case that there are always more than enough
channels to be 'in use' at any one time. The thread pool will dispatch
the tasks to threads as they become available....

I'll experiment to see if I can reproduce your experience, and get back
to you next week (after the bank holiday w/e here).

Steve Powell  (a happy bunny)
----------some more definitions from the SPD----------
chinchilla (n.) Cooling device for the lower jaw.
socialcast (n.) Someone to whom everyone is speaking but nobody likes.
literacy (n.) A textually transmitted disease usually contracted in childhood.

On 3 May 2012, at 04:34, Gautam Bakshi wrote:

> Hi Everyone,
> 
> I'm playing around with Rabbitmq(sorry for flooding the list lately) and am having some weird problems in my tests.  I have a multi-threaded application that I was considering using rabbitmq to manage the queue but I'm getting alot of blocking between my channels.  Is there a preferred way to setup pools?
> 
> I'm a bit new to Java so I used to the pools from apache commons but when I profile the channels are all blocking each other.  To test if dedicated connections per thread worked better I made an example of it as well and it does not block when the same program has a single channel per connection.  
> 
> So my question, do channels block each other or am I doing something wrong(using java api wrong or misunderstanding connection/channels)?  Is there a more preferred way? Unrelated to the question, but I was also wondering is there any difference between channels and connections in terms of throughput(i.e. would there be any benefit of using dedicated connections ignoring the overhead in establishing/maintaining the connection)?
> 
> Here's the test code(it simply sends to a thread a bunch of data to write the queue). If it helps I'm using v2.7.1, installed from MacPorts):
> 
> 
> import org.apache.commons.pool.BasePoolableObjectFactory;
> import org.apache.commons.pool.ObjectPool;
> import org.apache.commons.pool.PoolableObjectFactory;
> import org.apache.commons.pool.impl.GenericObjectPool;
> 
> import java.io.IOException;
> import java.util.NoSuchElementException;
> import java.util.Set;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.LinkedBlockingDeque;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> 
> 
> import com.rabbitmq.client.ConnectionFactory;
> import com.rabbitmq.client.Connection;
> import com.rabbitmq.client.Channel;
> import com.rabbitmq.client.MessageProperties;
> 
> public class PoolExample {
>     
> 	private static ExecutorService executor_worker;
> 	
> 	static {
>         final int numberOfThreads_ThreadPoolExecutor = 20;
>         executor_worker = Executors.newFixedThreadPool(numberOfThreads_ThreadPoolExecutor);
> 	}
> 	
>     public static void main(String[] args) throws Exception {
>     	System.out.println("starting.."); 	   		
>     	ObjectPool<Channel> pool =
>                 new GenericObjectPool<Channel>(
>                 new ConnectionPoolableObjectFactory(), 50);
>     	for (int x = 0; x<500000000; x++) {
>     		executor_worker.submit(new MyRunnable(x, pool));
>     	}
>     }
> }
> 
> /*
>  //this is a version that creates its own connection per channel
>  class ConnectionPoolableObjectFactory
>         extends BasePoolableObjectFactory<Channel> {
> 
>     private static final ConnectionFactory factory = newConnectionFactory();
> 
>     private static ConnectionFactory newConnectionFactory() {
>         ConnectionFactory factory = new ConnectionFactory();
>         factory.setHost("localhost");
>         return factory;
>     }
> 
>     @Override
>     public Channel makeObject() throws Exception {
>         System.out.println("new channel");
>     	return factory.newConnection().createChannel();
>         
>     }
> 
>     @Override
>     public boolean validateObject(Channel channel) {
>         return channel.isOpen();
>     }
> 
>     @Override
>     public void destroyObject(Channel channel) throws Exception {
>     	System.out.println("closing");
>         channel.close();
>     }
> 
>     @Override
>     public void passivateObject(Channel channel) throws Exception {
>         //System.out.println("sent back to queue");
>     }
> }
> */
> 
> //this version pools channels 
> class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
> 
> 	private final Connection connection;
> 
> 	ConnectionPoolableObjectFactory() throws IOException {
> 		ConnectionFactory factory = new ConnectionFactory();
> 		factory.setHost("localhost");
> 		connection = factory.newConnection();
> 	}
> 
> 	@Override
> 	public Channel makeObject() throws Exception {
> 		return connection.createChannel();
> 	}
> 
> 	@Override
> 	public boolean validateObject(Channel channel) {
> 		return channel.isOpen();
> 	}
> 
> 	@Override
> 	public void destroyObject(Channel channel) throws Exception {
> 		channel.close();
> 	}
> 
> 	@Override
> 	public void passivateObject(Channel channel) throws Exception {
> 		//do nothing
> 	}
> }
> 
> 
> class MyRunnable implements Runnable{  
> 	protected int x = 0;
> 	protected ObjectPool<Channel> pool;
> 	
>     public MyRunnable(int x, ObjectPool<Channel> pool) {
> 		// TODO Auto-generated constructor stub
>     	this.x = x;
>     	this.pool = pool;
> 	}
> 
> 	public void run(){
>         try {
>                 Channel channel = pool.borrowObject();
>             	String message = Integer.toString(x);
>             	channel.basicPublish( "", "task_queue", 
>                         MessageProperties.PERSISTENT_TEXT_PLAIN,
>                         message.getBytes());
>             	pool.returnObject(channel);
> 		} catch (NoSuchElementException e) {
> 			// TODO Auto-generated catch block
> 			e.printStackTrace();
> 		} catch (IllegalStateException e) {
> 			// TODO Auto-generated catch block
> 			e.printStackTrace();
> 		} catch (Exception e) {
> 			// TODO Auto-generated catch block
> 			e.printStackTrace();
> 		} 
>     }
> }
> 
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120504/9a50fcd0/attachment.htm>


More information about the rabbitmq-discuss mailing list