[rabbitmq-discuss] WG: Message Aggregating Queue

Josh Geisser josh at gebaschtel.ch
Tue May 3 06:26:31 BST 2011


Let me quickly try to understand that:

You want to get the most accurate instrument-data. As long as you're not overloaded, the sequential message processing is fine (queue size <=1). The Problem occurs if you can't keep up with the producers (>1). Then you have the case of consuming UBS at 12.3 although the very latest information (12.8) is already in the queue, but not yet processed, right?

A naïve way to overcome that would be a Stack/FILO instead of a queue, not? Instead of consuming the next message, you'd consume the most recent one.
This way you get 12.8 before 12.3.

Of coarse this imposes that you have to take care of which tick is which (they are time stamped, not?) when you working down the stack. 

The further problem of starving messages could be reduced if the broker could kind of match and remove out-dated messages on a same pattern.

afaik not possible with rabbit :(

My 2 cents :)

Cheers
Josh


-----Ursprüngliche Nachricht-----
Von: rabbitmq-discuss-bounces at lists.rabbitmq.com [mailto:rabbitmq-discuss-bounces at lists.rabbitmq.com] Im Auftrag von Jason Zaugg
Gesendet: Donnerstag, 28. April 2011 16:18
An: Irmo Manie
Cc: Alexis Richardson; rabbitmq-discuss at lists.rabbitmq.com
Betreff: Re: [rabbitmq-discuss] Message Aggregating Queue

On Thu, Apr 28, 2011 at 3:44 PM, Irmo Manie <irmo.manie at gmail.com> wrote:
> The problem is actually a bit more tricky than this. When it comes to
> market data you would route all the data to a client specific
> exclusive queue based on a subscription because every client has its
> own authorization, authentication and quality of service. (real-time,
> delayed, only eu stocks, tick by tick, tick only every minute, etc,
> etc).

But each client specific queue could implement the 'obsolete tick'
discarding in the broker, right?

> So the easy way is still just to have the consumer do the filtering
> itself by storing all messages in a key value store and empty this
> based on a ordered queue of ids to process.
> That way the consumer can 'consume' as fast as it can put the values
> in the K/V store which probably always is fast enough :-)

I would feel more comfortable knowing that the number of messages in
the broker is naturally bounded, even if the consumer misbehaves. It
would also be nice to have the possibility to have a pool of consumers
processing from a single queue, be able to restart the consumers
without losing unprocessed messages etc. Anyway, we should discuss
this some more internally; as always these sort of cats are amenable
to being skinned in multitude of ways :)

> Only if you can process the data (ticks) independently from each other
> it makes sense to have this filtering on the broker because then it
> would be useful for a cluster of consumers apps processing the data.
> But 9 out of 10 times you need the ticks of more than one instrument
> to do your business logic so you'd already keep a cache with the last
> values anyway.

> Still there could be other usecases of course where having this
> functionality at the broker can be really useful and powerful.

> /2cents

And well worth both of them :)

-jason
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss at lists.rabbitmq.com
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


-----Ursprüngliche Nachricht-----

Among other things, we're using RabbitMQ to distribute market data ticks. The interesting characteristic about this stream of messages is that a new tick for a obsoletes previous, unprocessed, ticks for the same stock.

After a little brainstorming last night with Alvaro Videla, I'm curious to discuss how this could be modeled with an extension to RabbitMQ.

It seems a similar problem to the Queue Based TTL [1], however TTL isn't quite right for this case, as the validity period of a tick depends on the liquidity of the stock. We really just want the
*latest* tick.

Assume a message stream:

1. Tick { stock = "UBS", bid = 12.3 }   RK="tick.UBS"
2. Tick { stock = "ABB", bid = 15.3 }  RK="tick.ABB"
3. Tick { stock = "UBS", bid = 12.28 } RK="tick.UBS"

These are sent to a fanout exchange, and on to a queue(s) for a Consumer(s). Assume that consumer is slow, and the messages are not processed immediately. Rather than just en queuing message #3, I would like to insert it in place of message #1.

This has the nice property that the broker won't overrun with messages if the consumer can't keep up; and that the consumer doesn't do work that is obsolete.

What would be the suitable way to identify that #1 and #3 refer to the same stock? Is the routing key of message #1 retained after it has been en queued?

An generalization of this would be to provide a function that takes the old and new message and combines them into an aggregated message.
For example, we might want to track the latest, min and max:

  AggregatedTick { stock = "UBS", latestBid = 12.28 minBid=12.28 maxBid=12.30 }

So, does this sound sensible and possible?

-jason

[1] http://www.rabbitmq.com/extensions.html#queue-ttl


More information about the rabbitmq-discuss mailing list