[rabbitmq-discuss] Multi-threaded Dispatch in Java Client

Rob Harrop rob at rabbitmq.com
Fri Oct 1 18:54:10 BST 2010


Recently I pushed branch bug18384 that changes the way callbacks are sent to Consumer implementations.

Following this change, the Connection maintains a dispatch thread that is used to send callbacks to the Consumers. This frees Consumers call blocking methods on the Connection and Channel.

A question came up on Twitter about making this configurable, allowing for a custom Executor to be plugged into the ConnectionFactory. I wanted to outline why this is complicated, discuss a possible implementation and see if there is much interest.

First off, we should establish that each Consumer should only receive callbacks in a single thread. If this is not the case, then chaos will ensue and Consumers will need to worry about their own thread safety beyond that of initialisation safety.

With only a single dispatch thread for all Consumers, this Consumer-Thread pairing is easy honoured.

When we introduce multiple threads, we have to ensure that each Consumer is paired with only one thread. When using the Executor abstraction, this prevents each callback dispatch from being wrapped up in a Runnable and sent to Executor, because you cannot guarantee which thread will be used.

To get around this, the Executor can be set to run 'n' long-running tasks (n being the number of threads in the Executor). Each of these tasks pulls dispatch instructions off a queue and executes them. Each Consumer is paired with one dispatch instruction queue, probably assigned on a round-robin basis. This is not too complicated and will provide simple balancing of dispatch load across the threads in the Executor.

Now, there are still some problems:

1. The number of threads in an Executor is not necessarily fixed (as with ThreadPoolExecutor).
2. There is no way, via Executor or ExecutorService to find out how many threads there are. Thus, we cannot know how many dispatch instruction queues to create.

However, we can certainly introduce a ConnectionFactory.setDispatchThreadCount(int). Behind the scenes this will create an Executors.newFixedThreadPool() and the correct number of dispatch queues and dispatch tasks.

I'm interested in hearing if anyone thinks I'm overlooking some simpler way of solving this, and indeed if this is even worth solving.

Regards,

Rob


More information about the rabbitmq-discuss mailing list