I did get the tip revision to work (the problem was just that I hadn't shut down the server properly). However, I'm still not seeing the desired behavior. I have two programs:<br><br>The client, which does this:<br>
<br>ch.QueueDeclare("myqueue");<br>ch.ExchangeDeclare("ex", ExchangeType.Direct);<br>ch.QueueBind("myqueue", "ex", "test", false, null);<br>
for (int i = 0; i < 11; i++ )<br> ch.BasicPublish("ex", "test", null, Encoding.UTF8.GetBytes("message " + i));<br><br>and the server, which does this:<br><br> QueueingBasicConsumer consumer = new QueueingBasicConsumer();<br>
ch.BasicConsume("myqueue", true, null, consumer);<br> ch.BasicQos(0, 1, false);<br><br> while (true)<br> {<br> try<br> {<br>
BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();<br> Console.WriteLine(Encoding.UTF8.GetString(e.Body));<br><br> Thread.Sleep(seconds * 1000); //pretend to take a while<br>
<br> ch.BasicAck(e.DeliveryTag, false);<br> }<br><br><br>If I run the client and then two different instances of the server, one with the seconds variable set to 1 second and the other with 2, I'd expect the former instance to print twice as many message bodies. Instead, they alternate, as if they had just aggressively fetched everything in the queue in a round-robin way.<br>
<br>Do I have something conceptually wrong, or is that an implementation issue?<br><br><br><div class="gmail_quote">On Tue, Apr 21, 2009 at 10:19 PM, Isaac Cambron <span dir="ltr"><<a href="mailto:icambron@gmail.com" target="_blank">icambron@gmail.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
Matthias,<br><br>Thanks, that's very helpful. A couple of follow-up questions:<br><br>1. What snapshot should I use? I tried out the default revision (not sure what that tracks) and it didn't queue like I expected; the first consumer to subscribe got all the tasks, even if more subscribers were added before the acks. I can't get the tip to run at all (crashes on startup). Presumably I need something in between?<br>
<br>2. It's not clear to me what all of the other available qos parameters mean. It appears (and again, wasn't able to try this on the tip revision) that only BasicQos(0, n, false) is supported. This sounds like it's what I want anyway (with, obv, n=1); is that right?<br>
<br>If what I have was expected to work, I can provide source code, or the crash dump if that's unexpected too.<br><br>Thanks,<br><font color="#888888">Isaac<br><br></font><div class="gmail_quote"><div>On Tue, Apr 21, 2009 at 7:38 PM, Matthias Radestock <span dir="ltr"><<a href="mailto:matthias@lshift.net" target="_blank">matthias@lshift.net</a>></span> wrote:<br>
</div><div><div></div><div><blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">Isaac,<div><br>
<br>
Isaac Cambron wrote:<br>
<blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
I have a client that will publish a bunch of tasks to execute - tasks A, B, C, D, E. I have a pool of machines that, individually, know how to do some subset of the tasks. For example, let's say that machine 1 can do A, B, and C, and machine 2 can do C, D, and E. A machine can only do one task at a time, and I only want one machine to do any particular task. Tasks can wait indefinitely until there's a free machine capable of executing it.<br>
<br>
I'm not sure what pieces go where. Do I create one queue per task? (If so, how will I make the machines only respond to one message from any queue at a time?)<br>
</blockquote>
<br></div>
That, in combination with a basic.qos prefetch count of 1, should work.<br>
<br>
More specifically ...<br>
<br>
- create a direct exchange to which tasks get published, using the task type as the routing key<br>
<br>
- create one queue per task type and bind it to the direct exchange with the tasks type as the binding key<br>
<br>
- let each worker machine open a channel, set the basic.qos prefetch count to 1, and then subscribe to (basic.consume) each of the queues corresponding to task types supported by that worker.<br>
<br>
- when a worker is done with a task it basic.ack's the message<br>
<br>
<br>
The prefetch count limits the number of tasks a worker will be sent before ack'ing them. Note that the basic.qos functionality required to do that has not made it into an official release yet; you'll have to use a recent snapshot of the RabbitMQ source. Your example is actually a pretty interesting use case for basic.qos, exploiting some of the more advanced aspects of that feature, so I'd love to hear how it works out for you.<br>
<br>
<br>
Regards,<br><font color="#888888">
<br>
Matthias.<br>
</font></blockquote></div></div></div><br>
</blockquote></div><br>