[rabbitmq-discuss] Request for comments

Valentino Volonghi dialtone at gmail.com
Sat Jun 20 03:38:02 BST 2009

So... As everybody these days I'm writing a task queue, the thing that  
is different
from the other alternatives (that I also have written myself) is that  
this one is
kind of a copy of ideas coming from the just released google app  
engine taskqueue.

First of all... the name is quebert and you can find it here:

I'm writing in RabbitMQ-discuss because the project uses AMQP (and  
I've tested it
with RabbitMQ 1.6.0 [minimum required for one feature]) as a distributed
transactional queue, and also because here there are probably many many
developers that are doing something similar.

So how does the system work... Essentially it's explained here:

There are 3 components in this system:

  - a queue of tasks
  - some dispatcher
  - something that executes tasks from web requests

How does this work:

  - When somebody adds a task, it mainly pushes a URL that should be  
requested by the

  - the dispatcher gets the task and makes the request

  - If the task completes successfully (with a 200) then it's  
considered completed, otherwise
     it's rescheduled.

Now... My small project, the actual code is around 250 lines in  
Twisted Matrix plus
roughly the same number for a small wrapper against txamqp (must get  
this fresh
from launchpad until they release the next one), has a couple of extras:

- It's more like a framework rather than an end user application and  
you have full control
over the dispatcher (called mediator). For example it's trivial to  
implement a mediator that
does exponential backoff in case of failures (in fact it's already  
implemented and it's roughly
30 lines of code including comments).

- It includes pre and post execution hooks so that you can easily  
update the task status
on an external service, or filter tasks that you don't want to run  
anymore or update other
kinds of counters etc. etc.

- It implements time to live (and you can easily implement priorities  
like in the example)
   using timestamps even if rabbitmq doesn't support it. The added  
cost is trivial for this kind
   of use.

  - If you need it, it uses basic_qos.prefetch_count to limit the  
number of events that each
    listener receives concurrently (the mediator itself has no such  

There are many nice things that come with a queue done with HTTP  
-  you can reuse existing tools!
    For example you might have a load balancer that keeps track of  
machine load or response
    time or any other metric (like what elastic load balancer does on  
AWS) and have that guy
    route the task request to the freshest worker machine.

- You can easily write executors in the language that you need for  
that specific task, it just
    needs an HTTP server and some encode/decoder for the wire format  
(by default JSON).

- The executors don't even need to be in the same network since the  
HTTP request can go
    anywhere and since it's created just when you need it and only for  
the time it takes to execute
    it there's no added cost for maintaining it open.

And I'm pretty sure there are many other cool things you can think of  
for your specific case.

The project has still one small bit of problem and that would be  
unittests that I still need to
write... lazy... But I'll do that soon. However the source code is  
pretty well formatted and
fully documented in all the obscure places. I would like comments from  
anybody interested
or not interested so that I can cover the most usecases possible.

Now... given that this guy doesn't need to keep track of machine load  
for load balancing or
other things and it's actually fairly simple and requires just a  
little bit of knowledge to be
used, and almost no deployment complexity, taking from one of the  

This is an example of a custom dispatcher that backoffs exponentially
on failures, prints before and after the task is executed and supports
3 different priority levels with 5 events concurrently in each.


from twisted.internet import reactor
from twisted.python import log

from quebert import amqp, mediator
def _run():
     username = "guest"
     password = "guest"
     host = "localhost"
     exchange = "flow"

     def pre_hook(task):
         log.msg("I'm about to execute this %s" % (task,))

     def post_process(task, result):
         log.msg("Result for task %s is %s" % (task, result))

     m = mediator.DelayOnErrorMediator()


     def onConnectionError(e):

     client = amqp.TxAMQPClient(host, username, password)
     routingKeys = [("tasks.high", "high-priority-queue"),
                    ("tasks.medium", "medium-priority-queue"),
                    ("tasks.low", "low-priority-queue")]

     for key, queue in routingKeys:
         client.watch(exchange, queue, key, m.messageReceived,
                      onDisconnect=onConnectionError, maxEvents=5

import sys
reactor.callLater(0, _run)

Valentino Volonghi aka Dialtone
Now running MacOS X 10.5
Home Page: http://www.twisted.it

-------------- next part --------------
A non-text attachment was scrubbed...
Name: PGP.sig
Type: application/pgp-signature
Size: 194 bytes
Desc: This is a digitally signed message part
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090619/8b2831f1/attachment.pgp 

More information about the rabbitmq-discuss mailing list