[rabbitmq-discuss] [Q] best way to add a sequencer to the broker
cremes.devlist at mac.com
Mon Dec 29 14:04:21 GMT 2008
In my application I would like to stamp each message on the bus with a
sequence number. Doing so allows me to replay events in order (amongst
other benefits). Some services will be publishing latency-sensitive
Each service connecting to the broker will publish to a topic
exchange. One to N other services may subscribe to the published data.
I foresee a topic hierarchy like so:
out.topic1 (passive, durable)
out.topic2 (passive, non-durable)
out.topic3.subtopic1 (passive, non-durable)
out.topic3.subtopic2 (passive, durable)
These exchanges are named with dot notation for simplicity of parsing
for regular expressions. I realize the exchange name doesn't need the
dot notation whereas that is a requirement for routing keys.
All of my data is serialized as a JSON object prior to publishing. The
object (hash) contains a key called :sequence which is set to 0 by the
publisher. The sequencer service will subscribe to all out.# topics
(with '#' as the routing key so it gets everything), read the content
body as a Map, and set the map :sequence value to i++ where 'i' is a
32-bit (or 64-bit) integer. It will then republish the JSON object to
a new topic of the form "in.topicX" and pass through the original
routing key. It's routing between, for example, out.topic1 and
in.topic1 and adding a sequence number as a side effect.
I played around with the examples SimpleTopicPublisher and
SimpleTopicConsumer as a basis for a new class. I was able to add a
JSONReader and JSONWriter to the new java class I call Sequencer. It
successfully performs the work as described in the prior paragraph.
RIght now I statically declare the in & out exchanges, but I see how I
could declare the "in" exchange based on the contents of the "out"
exchange name received in the envelope.
I have a few outstanding questions that I'm hoping some more
experienced folks can help answer.
1. This code will likely start before other services, so it will
declare the exchanges and block on them while waiting for traffic. The
topic Consumer/Producer examples all illustrate how to do this with a
single exchange. How do I accomplish this task with multiple
exchanges? Do I need a separate thread and channel for each exchange?
2. Is there a better way to accomplish this task? I've read some notes
about future releases of rabbitmq allowing for "internal" clients that
run inside the rabbitmq memory space. I think this is an ideal use of
that capability. Am I right or barking up the wrong tree? And does
that internal client need to be written in erlang or can it be java/c/
3. Is there a way to make this more dynamic so I do not have to
declare all the exchanges up-front in this sequencer code? Ideally
this service could detect that new exchanges were declared by other
services and automatically subscribe to them to do the sequence
stamping and routing.
Thanks for any feedback.
More information about the rabbitmq-discuss