[rabbitmq-discuss] please help understanding why my pool of channels are blocking.

Gautam Bakshi gautam.bakshi at gmail.com
Thu May 3 04:34:41 BST 2012


Hi Everyone,

I'm playing around with Rabbitmq(sorry for flooding the list lately) and am
having some weird problems in my tests.  I have a multi-threaded
application that I was considering using rabbitmq to manage the queue but
I'm getting alot of blocking between my channels.  Is there a preferred way
to setup pools?

I'm a bit new to Java so I used to the pools from apache commons but when I
profile the channels are all blocking each other.  To test if dedicated
connections per thread worked better I made an example of it as well and it
does not block when the same program has a single channel per connection.

So my question, do channels block each other or am I doing something
wrong(using java api wrong or misunderstanding connection/channels)?  Is
there a more preferred way? Unrelated to the question, but I was also
wondering is there any difference between channels and connections in terms
of throughput(i.e. would there be any benefit of using dedicated
connections ignoring the overhead in establishing/maintaining the
connection)?

Here's the test code(it simply sends to a thread a bunch of data to write
the queue). If it helps I'm using v2.7.1, installed from MacPorts):


import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

private static ExecutorService executor_worker;
 static {
        final int numberOfThreads_ThreadPoolExecutor = 20;
        executor_worker =
Executors.newFixedThreadPool(numberOfThreads_ThreadPoolExecutor);
}
    public static void main(String[] args) throws Exception {
    System.out.println("starting..");
    ObjectPool<Channel> pool =
                new GenericObjectPool<Channel>(
                new ConnectionPoolableObjectFactory(), 50);
    for (int x = 0; x<500000000; x++) {
    executor_worker.submit(new MyRunnable(x, pool));
    }
    }
}

/*
 //this is a version that creates its own connection per channel
 class ConnectionPoolableObjectFactory
        extends BasePoolableObjectFactory<Channel> {

    private static final ConnectionFactory factory = newConnectionFactory();

    private static ConnectionFactory newConnectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        return factory;
    }

    @Override
    public Channel makeObject() throws Exception {
        System.out.println("new channel");
    return factory.newConnection().createChannel();

    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
    System.out.println("closing");
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}
*/

//this version pools channels
class ConnectionPoolableObjectFactory extends
BasePoolableObjectFactory<Channel> {

private final Connection connection;

ConnectionPoolableObjectFactory() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
}

@Override
public Channel makeObject() throws Exception {
return connection.createChannel();
}

@Override
public boolean validateObject(Channel channel) {
return channel.isOpen();
}

@Override
public void destroyObject(Channel channel) throws Exception {
channel.close();
}

@Override
public void passivateObject(Channel channel) throws Exception {
//do nothing
}
}


class MyRunnable implements Runnable{
protected int x = 0;
protected ObjectPool<Channel> pool;
    public MyRunnable(int x, ObjectPool<Channel> pool) {
// TODO Auto-generated constructor stub
    this.x = x;
    this.pool = pool;
}

public void run(){
        try {
                Channel channel = pool.borrowObject();
            String message = Integer.toString(x);
            channel.basicPublish( "", "task_queue",
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes());
            pool.returnObject(channel);
} catch (NoSuchElementException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    }
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120502/b0e244a6/attachment.htm>


More information about the rabbitmq-discuss mailing list