[rabbitmq-discuss] Request for comments
dialtone at gmail.com
Sat Jun 20 03:38:02 BST 2009
So... As everybody these days I'm writing a task queue, the thing that
from the other alternatives (that I also have written myself) is that
this one is
kind of a copy of ideas coming from the just released google app
First of all... the name is quebert and you can find it here:
I'm writing in RabbitMQ-discuss because the project uses AMQP (and
I've tested it
with RabbitMQ 1.6.0 [minimum required for one feature]) as a distributed
transactional queue, and also because here there are probably many many
developers that are doing something similar.
So how does the system work... Essentially it's explained here:
There are 3 components in this system:
- a queue of tasks
- some dispatcher
- something that executes tasks from web requests
How does this work:
- When somebody adds a task, it mainly pushes a URL that should be
requested by the
- the dispatcher gets the task and makes the request
- If the task completes successfully (with a 200) then it's
considered completed, otherwise
Now... My small project, the actual code is around 250 lines in
Twisted Matrix plus
roughly the same number for a small wrapper against txamqp (must get
from launchpad until they release the next one), has a couple of extras:
- It's more like a framework rather than an end user application and
you have full control
over the dispatcher (called mediator). For example it's trivial to
implement a mediator that
does exponential backoff in case of failures (in fact it's already
implemented and it's roughly
30 lines of code including comments).
- It includes pre and post execution hooks so that you can easily
update the task status
on an external service, or filter tasks that you don't want to run
anymore or update other
kinds of counters etc. etc.
- It implements time to live (and you can easily implement priorities
like in the example)
using timestamps even if rabbitmq doesn't support it. The added
cost is trivial for this kind
- If you need it, it uses basic_qos.prefetch_count to limit the
number of events that each
listener receives concurrently (the mediator itself has no such
There are many nice things that come with a queue done with HTTP
- you can reuse existing tools!
For example you might have a load balancer that keeps track of
machine load or response
time or any other metric (like what elastic load balancer does on
AWS) and have that guy
route the task request to the freshest worker machine.
- You can easily write executors in the language that you need for
that specific task, it just
needs an HTTP server and some encode/decoder for the wire format
(by default JSON).
- The executors don't even need to be in the same network since the
HTTP request can go
anywhere and since it's created just when you need it and only for
the time it takes to execute
it there's no added cost for maintaining it open.
And I'm pretty sure there are many other cool things you can think of
for your specific case.
The project has still one small bit of problem and that would be
unittests that I still need to
write... lazy... But I'll do that soon. However the source code is
pretty well formatted and
fully documented in all the obscure places. I would like comments from
or not interested so that I can cover the most usecases possible.
Now... given that this guy doesn't need to keep track of machine load
for load balancing or
other things and it's actually fairly simple and requires just a
little bit of knowledge to be
used, and almost no deployment complexity, taking from one of the
This is an example of a custom dispatcher that backoffs exponentially
on failures, prints before and after the task is executed and supports
3 different priority levels with 5 events concurrently in each.
from twisted.internet import reactor
from twisted.python import log
from quebert import amqp, mediator
username = "guest"
password = "guest"
host = "localhost"
exchange = "flow"
log.msg("I'm about to execute this %s" % (task,))
def post_process(task, result):
log.msg("Result for task %s is %s" % (task, result))
m = mediator.DelayOnErrorMediator()
client = amqp.TxAMQPClient(host, username, password)
routingKeys = [("tasks.high", "high-priority-queue"),
for key, queue in routingKeys:
client.watch(exchange, queue, key, m.messageReceived,
Valentino Volonghi aka Dialtone
Now running MacOS X 10.5
Home Page: http://www.twisted.it
-------------- next part --------------
A non-text attachment was scrubbed...
Size: 194 bytes
Desc: This is a digitally signed message part
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090619/8b2831f1/attachment.pgp
More information about the rabbitmq-discuss