[rabbitmq-discuss] Message Aggregating Queue

Jason Zaugg jzaugg at gmail.com
Thu Apr 28 14:29:54 BST 2011


On Thu, Apr 28, 2011 at 3:01 PM, Alexis Richardson <alexis at rabbitmq.com> wrote:
> On Thu, Apr 28, 2011 at 1:54 PM, Jason Zaugg <jzaugg at gmail.com> wrote:
>> Another requirement I should make explicit: we have a single consumer
>> reading the stream of price updates for all stocks; and it should
>> process them the order. That is, we don't want to 'UBS' to move to the
>> back of the queue if it updates before the previous message is
>> processed. So a data structure like as LinkedHashMap would be needed
>> for the custom queue proposed by Alvaro.
>
> I'm confused.  Would this stream contain exactly one price per ticker
> symbol?  If so then why does it have to be a stream and not a set?  If
> it must be a stream with multiple values per ticker, then just use
> direct exchanges and one queue for the full stream.

We have one stream of market data updates being fed to a consumer (or
a pool of consumers) that need to keep up with the updates. But we
know something about these messages: a newer update for a stock
obsoletes any unprocessed updates for the same stock. We can provide
the unique ID as a routing key/header/... . We want to exploit this
fact to ensure that the memory usage of the broker is bounded by the
number of stocks rather than the number of unprocessed ticket; to
avoid unnecessary work in the Consumer; and to improve latency.

I don't want to make too many assumptions about what configuration of
exchange(s) and queue(s) is required (largely because my knowledge of
Rabbit is a little bit thin).

Here's how I'd do this in code (uncompiled and ignoring thread safety)

case class Ticker(name: String)
case class PriceUpdate(ticker: Ticker, price: Double)
class Ref[T](var value: T)
val queue = new mutable.LinkedHashMap[Ticker, Ref[Message]]

def enqueue(update: PriceUpdate) {
  queue.get(update.ticker) match {
    case Some(ref) => ref.value = update
    case None => queue += (update.ticker, new Ref(update)
}

def dequeue: Option[PriceUpdate] = {
  queue.reverseIterator.headOption {
      case Some((ticker, ref)) =>
         queue.remove(ticker)
         Some(ref.value)
      case None =>
         None
  }

enqueue(PriceUpdate(Ticker("UBS"), 12.30)
enqueue(PriceUpdate(Ticker("ABB"), 22.30))
enqueue(PriceUpdate(Ticker("UBS"), 12.28))
assert(dequeue == Some(PriceUpdate(Ticker("UBS"), 12.28))
assert(dequeue == Some(PriceUpdate(Ticker("ABB"), 12.38))
assert(dequeue == None)

> Are you sure you wouldn't be better off using a cache?

Could you elaborate on this option?

Thanks for all your advice so far,

-jason


More information about the rabbitmq-discuss mailing list