[rabbitmq-discuss] Message Aggregating Queue

Alexis Richardson alexis at rabbitmq.com
Thu Apr 28 15:09:32 BST 2011


As Irmo points out you could take Tim's approach further and "have the
consumer do the filtering itself by storing all messages in a key
value store and empty this based on a ordered queue of ids to process"

To that end try pumping the messages you want to cache into Redis.
Cc'ing the author.


On Thu, Apr 28, 2011 at 2:29 PM, Jason Zaugg <jzaugg at gmail.com> wrote:
> On Thu, Apr 28, 2011 at 3:01 PM, Alexis Richardson <alexis at rabbitmq.com> wrote:
>> On Thu, Apr 28, 2011 at 1:54 PM, Jason Zaugg <jzaugg at gmail.com> wrote:
>>> Another requirement I should make explicit: we have a single consumer
>>> reading the stream of price updates for all stocks; and it should
>>> process them the order. That is, we don't want to 'UBS' to move to the
>>> back of the queue if it updates before the previous message is
>>> processed. So a data structure like as LinkedHashMap would be needed
>>> for the custom queue proposed by Alvaro.
>> I'm confused.  Would this stream contain exactly one price per ticker
>> symbol?  If so then why does it have to be a stream and not a set?  If
>> it must be a stream with multiple values per ticker, then just use
>> direct exchanges and one queue for the full stream.
> We have one stream of market data updates being fed to a consumer (or
> a pool of consumers) that need to keep up with the updates. But we
> know something about these messages: a newer update for a stock
> obsoletes any unprocessed updates for the same stock. We can provide
> the unique ID as a routing key/header/... . We want to exploit this
> fact to ensure that the memory usage of the broker is bounded by the
> number of stocks rather than the number of unprocessed ticket; to
> avoid unnecessary work in the Consumer; and to improve latency.
> I don't want to make too many assumptions about what configuration of
> exchange(s) and queue(s) is required (largely because my knowledge of
> Rabbit is a little bit thin).
> Here's how I'd do this in code (uncompiled and ignoring thread safety)
> case class Ticker(name: String)
> case class PriceUpdate(ticker: Ticker, price: Double)
> class Ref[T](var value: T)
> val queue = new mutable.LinkedHashMap[Ticker, Ref[Message]]
> def enqueue(update: PriceUpdate) {
>  queue.get(update.ticker) match {
>    case Some(ref) => ref.value = update
>    case None => queue += (update.ticker, new Ref(update)
> }
> def dequeue: Option[PriceUpdate] = {
>  queue.reverseIterator.headOption {
>      case Some((ticker, ref)) =>
>         queue.remove(ticker)
>         Some(ref.value)
>      case None =>
>         None
>  }
> enqueue(PriceUpdate(Ticker("UBS"), 12.30)
> enqueue(PriceUpdate(Ticker("ABB"), 22.30))
> enqueue(PriceUpdate(Ticker("UBS"), 12.28))
> assert(dequeue == Some(PriceUpdate(Ticker("UBS"), 12.28))
> assert(dequeue == Some(PriceUpdate(Ticker("ABB"), 12.38))
> assert(dequeue == None)
>> Are you sure you wouldn't be better off using a cache?
> Could you elaborate on this option?
> Thanks for all your advice so far,
> -jason

More information about the rabbitmq-discuss mailing list