[rabbitmq-discuss] Message Aggregating Queue

Alvaro Videla videlalvaro at gmail.com
Thu Apr 28 13:13:48 BST 2011


Hi Guys,

I spent sometime last night reading the RabbitMQ implementation of Exchanges/Queues. This are my conclusions/ideas:

1) I wanted to have something like what Tim replied, that is, some sort of K/V queue, that just updates whatever there is for key *abc*. So in that way you always get the latest value in the consumer and all the others discarded.

To achieve that we could change the value for *backing_queue_module* on the .app file. Problem is that value is global on the server, which means we could only use this new type of queue, but not the default one (Please correct me if I'm wrong). On that matter I've checked RabbitMQ's code to see if there could be an extension to the protocol where on queue declaration we could specify the module implementation for the queue, that is the *backing_queue_module*. 

Then, the value passed on declaration could be stored in Mnesia by augmenting the #amqqueue{} record with a new filed like *module* or something like that. So by default that value would be *rabbit_variable_queue* else the value specified on queue.declare. 

Problem is the function rabbit_amqqueue:stop/0 on this particular code snippet:     
		
		{ok, BQ} = application:get_env(rabbit, backing_queue_module),
		ok = BQ:stop().

Since there we have no access to the #amqqueue{} record, it's not possible to get the BQ variable bound to something meaningful. Also, If I'm not mistaken the only place where rabbit_amqqueue:stop/0 is called is from one of the tests, if that's right, then is just a matter of modifying the test.

So in short, I think is possible to extend the protocol so during 'queue.declare' we could specify which kind of queue we want. That information could be stored in Mnesia using the #amqqueue{} record. The new queue module will have to implement the rabbit_backing_queue behaviour. Since for this particular use case the messages could be kept just in memory, I think the implementation shouldn't get to complex.

2) Another way of implementing this could be with a custom exchange:

The exchange will get the message, will purge the queue, and will publish the message. I know this is *hackish* but should work. Problem is as Jon Brisbin stated in another thread, that we don't have the Queues Pids when routing messages on the custom exchange interface. 

3) Regarding the problem of calculating the average/aggregation and flushing the proper message to the consumers once it finishes with the message that is processing, I think that could be implemented as another queue type too. We could also configure aggregation function callbacks for each kind of queue/message.

I hope this helps and that I didn't went too crazy at some points :)

Cheers,

Alvaro

On Apr 28, 2011, at 11:18 AM, Tim Fox wrote:

> On 28/04/11 10:06, Alexis Richardson wrote:
>> Jason
>> 
>> Interesting stuff :-)
>> 
>> Prior discussions on this subject have tended to the view that the
>> exchange is the best place to cache a 'last value'.
>> 
>> In your example below, the way to identify that #1 and #3 refer to the
>> same stock would be to use the stock ticker (eg "UBS") as the routing
>> key in a direct exchange.
> Other systems that implement "last value queues", that I know of, HornetQ and Qpid do it by having a special "queue" type which doesn't actually queue anything, but instead maintains a map of value_key => message
> 
> As new messages arrive with the same value of value_key they are put into the map, overwriting any previous value for that same key.
> 
> The value_key is passed in the message in a header with a well known name.
> 
> This is actually very simple to implement and doesn't require you to route things in special ways.
>> See for example this prototype: https://github.com/squaremo/rabbitmq-lvc-plugin
>> 
>> An aggregation capability is also doable but it's not obvious what the
>> best way to do it is.  It depends whether aggregate messages are the
>> same type as their constituents.  If they are not, as in your example,
>> then you might want to republish a new aggregate message whenever
>> something changes, and then cache aggregates using the LVC approach.
>> 
>> alexis
>> 
>> 
>> 
>> On Thu, Apr 28, 2011 at 9:28 AM, Jason Zaugg<jzaugg at gmail.com>  wrote:
>>> Among other things, we're using RabbitMQ to distribute market data
>>> ticks. The interesting characteristic about this stream of messages is
>>> that a new tick for a obsoletes previous, unprocessed, ticks for the
>>> same stock.
>>> 
>>> After a little brainstorming last night with Alvaro Videla, I'm
>>> curious to discuss how this could be modeled with an extension to
>>> RabbitMQ.
>>> 
>>> It seems a similar problem to the Queue Based TTL [1], however TTL
>>> isn't quite right for this case, as the validity period of a tick
>>> depends on the liquidity of the stock. We really just want the
>>> *latest* tick.
>>> 
>>> Assume a message stream:
>>> 
>>> 1. Tick { stock = "UBS", bid = 12.3 }   RK="tick.UBS"
>>> 2. Tick { stock = "ABB", bid = 15.3 }  RK="tick.ABB"
>>> 3. Tick { stock = "UBS", bid = 12.28 } RK="tick.UBS"
>>> 
>>> These are sent to a fanout exchange, and on to a queue(s) for a
>>> Consumer(s). Assume that consumer is slow, and the messages are not
>>> processed immediately. Rather than just en queuing message #3, I would
>>> like to insert it in place of message #1.
>>> 
>>> This has the nice property that the broker won't overrun with messages
>>> if the consumer can't keep up; and that the consumer doesn't do work
>>> that is obsolete.
>>> 
>>> What would be the suitable way to identify that #1 and #3 refer to the
>>> same stock? Is the routing key of message #1 retained after it has
>>> been en queued?
>>> 
>>> An generalization of this would be to provide a function that takes
>>> the old and new message and combines them into an aggregated message.
>>> For example, we might want to track the latest, min and max:
>>> 
>>>  AggregatedTick { stock = "UBS", latestBid = 12.28 minBid=12.28 maxBid=12.30 }
>>> 
>>> So, does this sound sensible and possible?
>>> 
>>> -jason
>>> 
>>> [1] http://www.rabbitmq.com/extensions.html#queue-ttl
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-discuss at lists.rabbitmq.com
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>> 
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
> 
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Sent form my Nokia 1100





More information about the rabbitmq-discuss mailing list