[rabbitmq-discuss] [Q] best way to add a sequencer to the broker

Chuck Remes cremes.devlist at mac.com
Mon Dec 29 14:04:21 GMT 2008

In my application I would like to stamp each message on the bus with a  
sequence number. Doing so allows me to replay events in order (amongst  
other benefits). Some services will be publishing latency-sensitive  

Each service connecting to the broker will publish to a topic  
exchange. One to N other services may subscribe to the published data.  
I foresee a topic hierarchy like so:

out.topic1 (passive, durable)
out.topic2 (passive, non-durable)
out.topic3.subtopic1 (passive, non-durable)
out.topic3.subtopic2 (passive, durable)

These exchanges are named with dot notation for simplicity of parsing  
for regular expressions. I realize the exchange name doesn't need the  
dot notation whereas that is a requirement for routing keys.

All of my data is serialized as a JSON object prior to publishing. The  
object (hash) contains a key called :sequence which is set to 0 by the  
publisher. The sequencer service will subscribe to all out.# topics  
(with '#' as the routing key so it gets everything), read the content  
body as a Map, and set the map :sequence value to i++ where 'i' is a  
32-bit (or 64-bit) integer. It will then republish the JSON object to  
a new topic of the form "in.topicX" and pass through the original  
routing key. It's routing between, for example, out.topic1 and  
in.topic1 and adding a sequence number as a side effect.

I played around with the examples SimpleTopicPublisher and  
SimpleTopicConsumer as a basis for a new class. I was able to add a  
JSONReader and JSONWriter to the new java class I call Sequencer. It  
successfully performs the work as described in the prior paragraph.  
RIght now I statically declare the in & out exchanges, but I see how I  
could declare the "in" exchange based on the contents of the "out"  
exchange name received in the envelope.

I have a few outstanding questions that I'm hoping some more  
experienced folks can help answer.

1. This code will likely start before other services, so it will  
declare the exchanges and block on them while waiting for traffic. The  
topic Consumer/Producer examples all illustrate how to do this with a  
single exchange. How do I accomplish this task with multiple  
exchanges? Do I need a separate thread and channel for each exchange?

2. Is there a better way to accomplish this task? I've read some notes  
about future releases of rabbitmq allowing for "internal" clients that  
run inside the rabbitmq memory space. I think this is an ideal use of  
that capability. Am I right or barking up the wrong tree? And does  
that internal client need to be written in erlang or can it be java/c/ 

3. Is there a way to make this more dynamic so I do not have to  
declare all the exchanges up-front in this sequencer code? Ideally  
this service could detect that new exchanges were declared by other  
services and automatically subscribe to them to do the sequence  
stamping and routing.

Thanks for any feedback.


More information about the rabbitmq-discuss mailing list