[rabbitmq-discuss] channel.flow calls causing code 541 INTERNAL_ERROR

Cameron Harris thecwin at gmail.com
Fri Jul 29 14:21:46 BST 2011


Hi all,

I have a process that consumes from a queue with BasicConsume, but if
there is a problem on the consumer that slows down message processing,
RabbitMQ will carry on delivering messages until the process runs out
of memory and crashes. In order to fix this, I created a second thread
that watches for too many messages building up in the client and calls
ChannelFlow(false) on the channel. After the consumer catches up
again, it ChannelFlow(true) to re-enable the channel flow.

Unfortunately, since switching the broker to Windows Server 2008 and
upgrading both the server and client to 2.5.1, the broker is
occasionally terminating my connection on the ChannelFlow call. The
exception text from the .NET client is:

       The AMQP operation was interrupted: AMQP close-reason,
initiated by Peer, code=541, text="INTERNAL_ERROR", classId=0,
methodId=0, cause=

When I was running on RHEL 6 I didn’t experience any problems, but I
hadn’t tested it thoroughly, so it might just be less likely to occur
on that setup. The clients are all running on Windows and using the
.NET client.

When this error occurs, the following is logged on the RabbitMQ server log:


=ERROR REPORT==== 29-Jul-2011::13:25:28 ===
** Generic server <0.2462.52> terminating
** Last message in was {'$gen_cast',{method,{'channel.flow',false},none}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.2459.52>,
                         <0.2461.52>,<0.2459.52>,undefined,
                         #Fun<rabbit_channel_sup.0.15412730>,none,
                         {set,0,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
                         4089,
                         {[],[]},
                         {[],[]},
                         {user,<<"guest">>,true,rabbit_auth_backend_internal,
                          {internal_user,<<"guest">>,
                           <<99,244,141,154,192,80,146,251,179,199,206,114,71,
                             177,54,91,30,76,32,168>>,
                           true}},
                         <<"/">>,<<"testq">>,
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],
                            [[<<"amq.ctag-KeIdUjHd5LF+Zcti3NHf1A==">>|
                              {{amqqueue,
                                {resource,<<"/">>,queue,<<"testq">>},
                                false,false,none,[],<0.26681.51>},
                               #Ref<0.0.8.186885>}]],
                            [],[],[],[]}}},
                         {dict,0,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],
                            [[#Ref<0.0.8.186885>|
                              <<"amq.ctag-KeIdUjHd5LF+Zcti3NHf1A==">>]],
                            [],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
                         <0.2457.52>,
                         {state,fine,{1311942332995907,#Ref<0.0.8.186868>}},
                         false,1,
                         {0,nil},
                         {0,nil},
                         [],
                         [{<<"basic.nack">>,bool,true},
                          {<<"publisher_confirms">>,bool,true},
                          {<<"consumer_cancel_notify">>,bool,true},
                          {<<"exchange_exchange_bindings">>,bool,true}],
                         none}
** Reason for termination ==
** {{badmatch,{error,already_present}},
    [{rabbit_channel_sup,'-start_limiter_fun/1-fun-0-',2},
     {rabbit_channel,start_limiter,1},
     {rabbit_channel,handle_method,3},
     {rabbit_channel,handle_cast,2},
     {gen_server2,handle_msg,2},
     {proc_lib,init_p_do_apply,3}]}



Here is the code I’m using that will trigger the error:


// ---------------------------- Start code ------------------------------------

using System;
using System.Threading;

using RabbitMQ.Client;

namespace test
{
    class Consumer : IBasicConsumer
    {
        private int count;

        public Consumer(IModel model) { this.Model = model; }

        public void HandleBasicConsumeOk(string consumerTag) { }
        public void HandleBasicCancelOk(string consumerTag) { }
        public void HandleBasicCancel(string consumerTag) { }
        public void HandleModelShutdown(IModel model,
ShutdownEventArgs reason) { }

        public void HandleBasicDeliver(string consumerTag, ulong
deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, byte[] body)
        {
            Interlocked.Increment(ref count);
            if (count % 1000 == 0) Console.WriteLine("Consumed {0}", count);
        }

        public IModel Model { get; set; }

        public int Count { get { return count; } }
    }

    class Program
    {
        static void Main()
        {
            var cf = new ConnectionFactory
                {
                    HostName = "vmdevrmq1.apdev.local",
                    UserName = "guest",
                    Password = "guest",
                    Port = AmqpTcpEndpoint.UseDefaultPort
                };

            var conn = cf.CreateConnection();
            new Thread(
                x =>
                    {
                        var model = conn.CreateModel();
                        model.QueueDeclare("testq", false, false, false, null);

                        var props = model.CreateBasicProperties();
                        for (var i = 0; ; i++)
                        {
                            if (i % 1000 == 0)
                            {
                                Console.WriteLine("Published {0}", i);
                            }
                            model.BasicPublish("", "testq", false,
false, props, new byte[] { });
                        }
                    }).Start();

            var consumerModel = conn.CreateModel();
            consumerModel.QueueDeclare("testq", false, false, false, null);

            var consumer = new Consumer(consumerModel);
            consumerModel.BasicConsume("testq", true, null, consumer);

            bool flowEnabled = true;
            while (true)
            {
                flowEnabled = !flowEnabled;
                Console.WriteLine("Setting Channel Flow: {0}", flowEnabled);
                consumerModel.ChannelFlow(flowEnabled);
            }
        }
    }
}

// ---------------------------- End code ------------------------------------

I accept that this might be an error in my usage; in particular, the
way I’m calling ChannelFlow on my thread at the same time as consuming
on the connection thread, but I didn’t know how else to do it (doing
it from the connection thread in the consumer causes deadlocks), and
also I wouldn’t have expected an internal error. Another approach
would be explicit acks and basic.qos, but when it worked, the
ChannelFlow approach was much faster.

Thanks!
Cameron


More information about the rabbitmq-discuss mailing list