[rabbitmq-discuss] Message Aggregating Queue
Jason Zaugg
jzaugg at gmail.com
Thu Apr 28 14:29:54 BST 2011
On Thu, Apr 28, 2011 at 3:01 PM, Alexis Richardson <alexis at rabbitmq.com> wrote:
> On Thu, Apr 28, 2011 at 1:54 PM, Jason Zaugg <jzaugg at gmail.com> wrote:
>> Another requirement I should make explicit: we have a single consumer
>> reading the stream of price updates for all stocks; and it should
>> process them the order. That is, we don't want to 'UBS' to move to the
>> back of the queue if it updates before the previous message is
>> processed. So a data structure like as LinkedHashMap would be needed
>> for the custom queue proposed by Alvaro.
>
> I'm confused. Would this stream contain exactly one price per ticker
> symbol? If so then why does it have to be a stream and not a set? If
> it must be a stream with multiple values per ticker, then just use
> direct exchanges and one queue for the full stream.
We have one stream of market data updates being fed to a consumer (or
a pool of consumers) that need to keep up with the updates. But we
know something about these messages: a newer update for a stock
obsoletes any unprocessed updates for the same stock. We can provide
the unique ID as a routing key/header/... . We want to exploit this
fact to ensure that the memory usage of the broker is bounded by the
number of stocks rather than the number of unprocessed ticket; to
avoid unnecessary work in the Consumer; and to improve latency.
I don't want to make too many assumptions about what configuration of
exchange(s) and queue(s) is required (largely because my knowledge of
Rabbit is a little bit thin).
Here's how I'd do this in code (uncompiled and ignoring thread safety)
case class Ticker(name: String)
case class PriceUpdate(ticker: Ticker, price: Double)
class Ref[T](var value: T)
val queue = new mutable.LinkedHashMap[Ticker, Ref[Message]]
def enqueue(update: PriceUpdate) {
queue.get(update.ticker) match {
case Some(ref) => ref.value = update
case None => queue += (update.ticker, new Ref(update)
}
def dequeue: Option[PriceUpdate] = {
queue.reverseIterator.headOption {
case Some((ticker, ref)) =>
queue.remove(ticker)
Some(ref.value)
case None =>
None
}
enqueue(PriceUpdate(Ticker("UBS"), 12.30)
enqueue(PriceUpdate(Ticker("ABB"), 22.30))
enqueue(PriceUpdate(Ticker("UBS"), 12.28))
assert(dequeue == Some(PriceUpdate(Ticker("UBS"), 12.28))
assert(dequeue == Some(PriceUpdate(Ticker("ABB"), 12.38))
assert(dequeue == None)
> Are you sure you wouldn't be better off using a cache?
Could you elaborate on this option?
Thanks for all your advice so far,
-jason
More information about the rabbitmq-discuss
mailing list