[rabbitmq-discuss] Message Aggregating Queue

Tim Fox tim at rabbitmq.com
Thu Apr 28 10:18:33 BST 2011


On 28/04/11 10:06, Alexis Richardson wrote:
> Jason
>
> Interesting stuff :-)
>
> Prior discussions on this subject have tended to the view that the
> exchange is the best place to cache a 'last value'.
>
> In your example below, the way to identify that #1 and #3 refer to the
> same stock would be to use the stock ticker (eg "UBS") as the routing
> key in a direct exchange.
Other systems that implement "last value queues", that I know of, 
HornetQ and Qpid do it by having a special "queue" type which doesn't 
actually queue anything, but instead maintains a map of value_key => message

As new messages arrive with the same value of value_key they are put 
into the map, overwriting any previous value for that same key.

The value_key is passed in the message in a header with a well known name.

This is actually very simple to implement and doesn't require you to 
route things in special ways.
> See for example this prototype: https://github.com/squaremo/rabbitmq-lvc-plugin
>
> An aggregation capability is also doable but it's not obvious what the
> best way to do it is.  It depends whether aggregate messages are the
> same type as their constituents.  If they are not, as in your example,
> then you might want to republish a new aggregate message whenever
> something changes, and then cache aggregates using the LVC approach.
>
> alexis
>
>
>
> On Thu, Apr 28, 2011 at 9:28 AM, Jason Zaugg<jzaugg at gmail.com>  wrote:
>> Among other things, we're using RabbitMQ to distribute market data
>> ticks. The interesting characteristic about this stream of messages is
>> that a new tick for a obsoletes previous, unprocessed, ticks for the
>> same stock.
>>
>> After a little brainstorming last night with Alvaro Videla, I'm
>> curious to discuss how this could be modeled with an extension to
>> RabbitMQ.
>>
>> It seems a similar problem to the Queue Based TTL [1], however TTL
>> isn't quite right for this case, as the validity period of a tick
>> depends on the liquidity of the stock. We really just want the
>> *latest* tick.
>>
>> Assume a message stream:
>>
>> 1. Tick { stock = "UBS", bid = 12.3 }   RK="tick.UBS"
>> 2. Tick { stock = "ABB", bid = 15.3 }  RK="tick.ABB"
>> 3. Tick { stock = "UBS", bid = 12.28 } RK="tick.UBS"
>>
>> These are sent to a fanout exchange, and on to a queue(s) for a
>> Consumer(s). Assume that consumer is slow, and the messages are not
>> processed immediately. Rather than just en queuing message #3, I would
>> like to insert it in place of message #1.
>>
>> This has the nice property that the broker won't overrun with messages
>> if the consumer can't keep up; and that the consumer doesn't do work
>> that is obsolete.
>>
>> What would be the suitable way to identify that #1 and #3 refer to the
>> same stock? Is the routing key of message #1 retained after it has
>> been en queued?
>>
>> An generalization of this would be to provide a function that takes
>> the old and new message and combines them into an aggregated message.
>> For example, we might want to track the latest, min and max:
>>
>>   AggregatedTick { stock = "UBS", latestBid = 12.28 minBid=12.28 maxBid=12.30 }
>>
>> So, does this sound sensible and possible?
>>
>> -jason
>>
>> [1] http://www.rabbitmq.com/extensions.html#queue-ttl
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss



More information about the rabbitmq-discuss mailing list