[rabbitmq-discuss] Message Aggregating Queue

Jason Zaugg jzaugg at gmail.com
Thu Apr 28 09:28:27 BST 2011


Among other things, we're using RabbitMQ to distribute market data
ticks. The interesting characteristic about this stream of messages is
that a new tick for a obsoletes previous, unprocessed, ticks for the
same stock.

After a little brainstorming last night with Alvaro Videla, I'm
curious to discuss how this could be modeled with an extension to
RabbitMQ.

It seems a similar problem to the Queue Based TTL [1], however TTL
isn't quite right for this case, as the validity period of a tick
depends on the liquidity of the stock. We really just want the
*latest* tick.

Assume a message stream:

1. Tick { stock = "UBS", bid = 12.3 }   RK="tick.UBS"
2. Tick { stock = "ABB", bid = 15.3 }  RK="tick.ABB"
3. Tick { stock = "UBS", bid = 12.28 } RK="tick.UBS"

These are sent to a fanout exchange, and on to a queue(s) for a
Consumer(s). Assume that consumer is slow, and the messages are not
processed immediately. Rather than just en queuing message #3, I would
like to insert it in place of message #1.

This has the nice property that the broker won't overrun with messages
if the consumer can't keep up; and that the consumer doesn't do work
that is obsolete.

What would be the suitable way to identify that #1 and #3 refer to the
same stock? Is the routing key of message #1 retained after it has
been en queued?

An generalization of this would be to provide a function that takes
the old and new message and combines them into an aggregated message.
For example, we might want to track the latest, min and max:

  AggregatedTick { stock = "UBS", latestBid = 12.28 minBid=12.28 maxBid=12.30 }

So, does this sound sensible and possible?

-jason

[1] http://www.rabbitmq.com/extensions.html#queue-ttl


More information about the rabbitmq-discuss mailing list