[rabbitmq-discuss] dotnet client - Increasing prefetch with BasicQoS during HandleBasicDeliver doesn't trigger retrieval of next message

Jon Stelly jonstelly at gmail.com
Mon May 12 14:08:38 BST 2014


Thanks Simon,

I've tried global as true and false for BasicQoS and see the same behavior.
 I've got some quick example code below that demonstrates the issue I'm
having and a zip archive of the project is available
here<https://www.dropbox.com/s/x7q9e6ijuxzp9zn/RabbitQoSTest.zip>.
 I captured network traffic from WireShark and it seems that calling
BasicQoS() and increasing the prefetchCount, the qos change gets sent to
the server, and the server replies with a qos-ok, but the server doesn't
send more messages if the new prefetch count is greater than the
outstanding message count for the channel.  It's not until I Ack the first
message that the second one is sent.

It seems to me that an Ack from a client is causing the server to look at
the queue being consumed and do something like:

if(queue.has-messages && channel.unacked-messages < channel.prefetch-count)
    send-messages-to-channel-until-unacked-equals-prefetch-count(channel)

For the behavior I'd like to see, it seems like a call to basic.qos would
require the same logic.

Below is a snippet of my example code.  ActionConsumer is a simple
IBasicConsumer that invokes the action passed to it's constructor during
HandleBasicDeliver.  It also uses a simple extension method to List<T>
called WaitFor() which waits for the list to contain the specified number
of items.  I've added comments based on my network capture of this code
being executed.

Thanks for any help.  Right now, I've got a pretty ugly workaround, which
is that instead of increasing prefetch count and calling basicqos, I just
do a basicget, but this is definitely hacky and makes me poll using
basicget() on another thread, etc...

Thanks.


var consumer = new ActionConsumer(Model, message =>
    Task.Run(() =>
    {
        Log.DebugFormat("Received({0})", message);
        messages.Add(Encoding.UTF8.GetString(message.Body));
        if (messages.Count == 1)
        {
            //If this is the first message, send a second, set qos to 2 and
wait for second message to arrive
            Send("World"); //This message gets sent immediately
            Log.Debug("BasicQoS(0, 2, false)");
            Model.BasicQos(0, 2, false); //QoS is sent to server
immediately, prefetch shows as 2 in management console
            Log.Debug("WaitFor(2, 5000)");
            messages.WaitFor(2, 5000); //While waiting, 2nd message will
not be received
            Log.DebugFormat("BasicAck({0}, false)", message.DeliveryTag);
            Model.BasicAck(message.DeliveryTag, false); //As soon as Ack is
sent, 2nd message is transmitted to client
            messages.WaitFor(2, 2500);
            Log.Debug("Finished task 1 processing");
        }
    }));

//Start consuming
Model.BasicConsume(queue, false, consumer);

//Send starter message
Send("Hello");
var ret = messages.WaitFor(2, 15000);


On Mon, May 12, 2014 at 4:08 AM, Simon MacMullen <simon at rabbitmq.com> wrote:

> On 11/05/2014 04:55, Jon Stelly wrote:
>
>> I have a sort of odd scenario.  I want to process 1 message at a time so
>> I call basic qos with a prefetch of 1 and call BasicConsume(), but
>> sometimes my processing will end in a long blocking task.  When that
>> happens, I would like to increase the qos prefetch to 2 to trigger the
>> next message to be received and processed.  The issue is that I don't
>> want to ack the first message until after the long blocking process
>> completes successfully.
>>
>> If I call BasicQoS before acking the first message, the 2nd message
>> isn't received until after the long running task completes and the ack
>> is sent.  Can anyone explain what's going on and possibly suggest an
>> alternative?
>>
>
> You are running 3.3.x? The new semantics for consumer prefetch mean that
> setting basicQos after consuming will only define a prefetch for *new*
> consumers. You could work around this by setting global=true.
>
> See http://www.rabbitmq.com/consumer-prefetch.html for more information.
>
> I am starting to wonder if the global=false qos should set the prefetch
> for all consumers on the channel; you wouldn't be able to have one channel
> with multiple consumers with different prefetch, but this is an area that
> seems to be unintuitive...
>
> Cheers, Simon
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20140512/920bae26/attachment.html>


More information about the rabbitmq-discuss mailing list