[rabbitmq-discuss] Trying to do a simple subscription returns message too long error... (RabbitMq.Net)

Patrick Kenney pekenney at gmail.com
Wed Jul 29 20:00:36 BST 2009


anybody see anything possible in the code snip below that could cause the
message to long error displayed at the bottom? note, this error is occurring
on subscribe, not publish...

I can reproduce it on a xp pro workstation and a windows server 2003 box...

<snip>
//winform app
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Collections;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.ServiceModel.Web;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.ServiceModel.Description;

using Test.Base;
using Test.Entity;

using RabbitMQ.ServiceModel;
using RabbitMQ.Client;

namespace RabbitMqPubSub1
{
    public partial class frmMain : Form
    {
        ServiceHost dispatcherHost = null;
        ServiceHost subscriberHost = null;

        string m_strSubscriber = string.Empty;
        string m_strDispatcher = string.Empty;
        string m_strSubscriberHost = string.Empty;

        public frmMain()
        {
            InitializeComponent();
        }

        /// <summary>
        /// Publish Test Message
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnPub_Click(object sender, EventArgs e)
        {
            try
            {
                using (ChannelFactory<IDispatcherService> scf =
GetDispatcherCF())
                {
                    IDispatcherService client = scf.CreateChannel();
                    Test.Entity.Message msg = new Test.Entity.Message();
                    msg.Source = txtPubContent.Text;
                    msg.Contexts.Add("c1");

                    for (int i = 0; i < 1000; i++) //change seed for generic
testing...
                    {
                        client.Publish(msg);
                    }
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// Host The Dispatcher
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnHostDispatcher_Click(object sender, EventArgs e)
        {
            if (txtHostDispatcher.Text == string.Empty)
            {
                m_strDispatcher = "amqp:///dispatcher";
            }
            else
            {
                m_strDispatcher = txtHostDispatcher.Text;
            }

            dispatcherHost = new
ServiceHost(typeof(Dispatcher.DispatcherService), new Uri("soap.amqp:///"));
            dispatcherHost.AddServiceEndpoint(typeof(IDispatcherService),
new RabbitMQBinding(new Uri("amqp://localhost:5672/"), Protocols.AMQP_0_8),
m_strDispatcher);
            dispatcherHost.CloseTimeout = TimeSpan.FromMinutes(10);

            dispatcherHost.Open();
        }

        private ChannelFactory<IDispatcherService> GetDispatcherCF()
        {
            if (txtHostDispatcher.Text == string.Empty)
            {
                m_strDispatcher = "amqp:///dispatcher";
            }
            else
            {
                m_strDispatcher = txtHostDispatcher.Text;
            }

            ChannelFactory<IDispatcherService> scf;

            //The RabbitMQBinding instantiation is where the error is being
raised...
            scf = new ChannelFactory<IDispatcherService>(
                new RabbitMQBinding(
                    new Uri("amqp://localhost:5672/"),
                        Protocols.AMQP_0_8),
                            new EndpointAddress(m_strDispatcher));

            return scf;
        }

        /// <summary>
        /// Subscribe to queue
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnSubscribe_Click(object sender, EventArgs e)
        {
            if (txtHostSubscriberAddress.Text == string.Empty)
            {
                m_strSubscriber = "amqp:///subscriber";
            }
            else
            {
                m_strSubscriber = txtHostSubscriberAddress.Text;
            }

            try
            {
                using (ChannelFactory<IDispatcherService> scf =
GetDispatcherCF())
                {
                    IDispatcherService client = scf.CreateChannel();
                    client.Subscribe(new
Test.Base.SubscriberInfo(m_strSubscriber, "c1", "c2"));
                }
            }

            catch (Exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// Host Subscriber
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnHostSubscriber_Click(object sender, EventArgs e)
        {
            if (txtHostSubscriberAddress.Text == string.Empty)
            {
                m_strSubscriberHost = "amqp:///subscriber";
            }
            else
            {
                m_strSubscriberHost = txtHostSubscriberAddress.Text;
            }

            //The ServiceHost must specify a base or absolute endpoint
address under the soap.amqp scheme.
            //An endpoint should then be added to the service using the
RabbitMQBinding.
            subscriberHost = new
ServiceHost(typeof(Subscriber.SubscriberService), new Uri("soap.amqp:///"));
            subscriberHost.AddServiceEndpoint(typeof(ISubscriberService),
new RabbitMQBinding(new Uri("amqp://localhost:5672/"), Protocols.AMQP_0_8),
m_strSubscriberHost);
            subscriberHost.CloseTimeout = TimeSpan.FromMinutes(10);

            subscriberHost.Open();
        }

        /// <summary>
        /// Create Queue
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnCreateQueue_Click(object sender, EventArgs e)
        {
            string address = "localhost:5672";//"amqp:///";
            string strQueue = txtQueueName.Text;
            bool durable = true;
            string exchange = string.Empty;
            string routingKey = string.Empty;

            using (IConnection connection = new
ConnectionFactory().CreateConnection(address))
            {
                using (IModel model = connection.CreateModel())
                {
                    strQueue = model.QueueDeclare(strQueue, durable);

                    model.QueueBind(strQueue, exchange, routingKey, false,
null);
                }
            }
        }
    }
}

</snip>

RE:
On Thu, Jul 23, 2009 at 9:13 AM, .... wrote:

> although there are no messages any longer then "test message" in the
> applicable queue...
>
> the following is returned...
>
> The AMQP operation was interrupted: AMQP close-reason, initiated by
> Library, code=311, text="The body of a message (18446744073709551575 bytes)
> was too long.", classId=0, methodId=0,
> cause=RabbitMQ.Client.Impl.BodyTooLongException: The body of a message
> (18446744073709551575 bytes) was too long.
>    at RabbitMQ.Client.Impl.ContentHeaderBase.ReadFrom(Int32 channelNumber,
> NetworkBinaryReader reader) in
> C:\RabbitMQ.Client\RabbitMQ\Client\Impl\ContentHeaderBase.cs:line 23
>    at RabbitMQ.Client.Impl.CommandAssembler.HandleFrame(Frame f) in
> C:\RabbitMQ.Client\RabbitMQ\Client\Impl\CommandAssembler.cs:line 53
>    at RabbitMQ.Client.Impl.Session.HandleFrame(Frame frame) in
> C:\RabbitMQ.Client\RabbitMQ\Client\Impl\Session.cs:line 16
>    at RabbitMQ.Client.Impl.ConnectionBase.MainLoopIteration() in
> C:\RabbitMQ.Client\RabbitMQ\Client\Impl\ConnectionBase.cs:line 434
>    at RabbitMQ.Client.Impl.ConnectionBase.MainLoop() in
> C:\RabbitMQ.Client\RabbitMQ\Client\Impl\ConnectionBase.cs:line 388
>
> suggestions, comments, assistance?
>
> thanks in advance.
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090729/e3b0e115/attachment.htm 


More information about the rabbitmq-discuss mailing list