[rabbitmq-discuss] SimpleConsumer EOFException

Susheel Daswani sdaswani at gmail.com
Thu Feb 9 18:12:49 GMT 2012


Thanks Emile - I will look into AMQP heartbeats.

Another issue I've run into is my Queuing Consumer isn't connecting
reliably. I will start up a Queuing Consumer and it may or may not
connect, depending on how it is feeling ;) . When it doesn't connect,
I usually won't see any exception, though it will sometimes error out
with the EOFException I previously wrote about. FYI I am sure there
are messages to be consumed as I can see them from our web admin
interface.

Here is my code, which I pretty created from following the examples
online (e.g., http://www.rabbitmq.com/tutorials/tutorial-one-java.html):

package com.atti.ratingstrends.rabbitreceiver;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Receiver {

    private String _uri;
    private String _queueName;
    private Connection _connection;
    private Channel _channel;
    private QueueingConsumer _consumer;

    // Used during construction and attempts to re-establish
    private void initConsumer()
        throws java.net.URISyntaxException, java.io.IOException,
               java.security.NoSuchAlgorithmException,
               java.security.KeyManagementException {

        // cleanup
        if (_connection != null)
            _connection.close();
        if (_channel != null)
            _channel.close();

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(_uri);
        _connection = factory.newConnection();

        _channel = _connection.createChannel();
        _channel.queueDeclare(_queueName, false, false, false, null);

        _consumer = new QueueingConsumer(_channel);
        _channel.basicConsume(_queueName, true, _consumer);
    }

    /**
       @param uri The full amqp uri for the RabbitMQ you want to
access.
       @param queueName The name of the queue you want to access.
     */
    // Not opting for a static factory method. May want to subclass
and
    // not many variations of this class expected.
    public Receiver(String uri, String queueName)
        throws java.net.URISyntaxException, java.io.IOException,
               java.security.NoSuchAlgorithmException,
               java.security.KeyManagementException {

        if (queueName == null || queueName.equals(""))
            throw new java.io.IOException("Queue name must not be null
or empty.");
        _queueName = queueName;
        _uri = uri;

        initConsumer();
    }

    /** Will block.
     */
    public String getNextMessageBody() throws
java.lang.InterruptedException {
        try {
            QueueingConsumer.Delivery delivery =
_consumer.nextDelivery();
            return new String(delivery.getBody());
        }
        // Catching Runtime Exceptions are bad, but here, I don't
understand
        // why we are seeing EOF exceptions, since the RabbitMQ should
never
        // EOF. So attempt to restart the connection ONCE.
        catch (com.rabbitmq.client.ShutdownSignalException io) {
            if (io.getReason() instanceof java.io.EOFException) {
                System.out.println("Receiver.getNextMessageBody(): The
RabbitMQ threw an EOF - attempting to re-start connection:" + io);
                try {
                    initConsumer();
                    QueueingConsumer.Delivery delivery =
_consumer.nextDelivery();
                    return new String(delivery.getBody());
                }
                catch (Exception e) {
                    System.out.println("Receiver.getNextMessageBody():
The RabbitMQ threw an EOF - restart attempt failed!");
                    throw io;
                }
            }
            throw io;
        }
    }

    /** Caution - will empty all old messages.
     * @returns The number of messages purge.
     * @throws java.io.IOException - if an error is encountered.
     */
    public int purge() throws java.io.IOException {
        return _channel.queuePurge(_queueName).getMessageCount();
    }
}

Does it look like I am doing anything wrong that would cause the
connection issue?

Thanks!
Susheel

On Feb 9, 2:49 am, Emile Joubert <em... at rabbitmq.com> wrote:
> Hi Susheel,
>
> On 08/02/12 21:48, Susheel Daswani wrote:
>
> > java.io.EOFException
> >    at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:
>
> An EOF exception on a socket points to a network problem. You could try
> to eliminate the network by running the test over localhost, or enable
> AMQP heartbeats.
>
> -Emile
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


More information about the rabbitmq-discuss mailing list