[rabbitmq-discuss] Rabbitmq server crash.

Pankaj Mishra Pankaj.Mishra at indiatimes.co.in
Wed Aug 29 13:12:13 BST 2012


Hi,

We experienced a strange problem with rabbitmq server running in cluster. Actually according to
Log file the master of the server crashed. Post that all my publishers continue to send message without
Throwing any exception but all those messages were dropped silently by rabbitmq server. Consumer were not able
To get any of those messages until we restarted the rabbitmq server again.

I have attached with this mail the server crash log for master as well as for slave.

We have following setup for servers.
There are two servers. Both are in cluster in which one has data in disc and other has in RAM.

We have two queues both are durable and our message are also persistent.

Version of server that we are using 2.8.4
Erlang version ( Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:8:8] [rq:8] [async-threads:0] [hipe] [kernel-poll:false]   )

We are using mirrored queue to avoid emergency failure of one server.  I am providing below the Code of our rabbit mq client.

We will be highly grateful for the explanation of aforesaid server behaviour.

PS: This happens once in a month.

Thanks,
Pankaj


Our consumer code

package com.times.mytimes.rabbitclient;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public abstract class BaseConsumer {
            final static Logger logger = LoggerFactory.getLogger(BaseConsumer.class);

            RabbitMqConnectionFactory rabbitMqConnectionFactory;

            private boolean shutdownFired = false;

            private BaseMessageHandler messageHandler;

            private ExecutorService threadExecutor;

            private int numberOFThreads;

            private ConsumerThread  consumerThread;

            private ConnectionShutDownListener connectionShutDownListener;

            private Thread th;

            public BaseConsumer(int numberOfConsumer, RabbitMqConnectionFactory rabbitMqConnectionFactory,
                                    BaseMessageHandler mytimesMessageHandler,int numberOFThreads) throws IOException {
                        this.messageHandler = mytimesMessageHandler;
                        this.rabbitMqConnectionFactory = rabbitMqConnectionFactory;
                        consumerThread= new ConsumerThread(numberOfConsumer, this.rabbitMqConnectionFactory.getConnection());
                        th= new Thread(consumerThread);
                        th.start();
                        threadExecutor = Executors.newFixedThreadPool(numberOFThreads);
                        this.numberOFThreads = numberOFThreads;
            }


    public abstract String getQueueName();

    public abstract String getRoutingKey();

    public abstract String getExchangeName();




            public void createConsumers(final int numberOfConsumer, final Connection connection ) throws IOException {
                        connectionShutDownListener = new ConnectionShutDownListener(numberOfConsumer);
                        connection.addShutdownListener(connectionShutDownListener);
                        for (int i = 0; i < numberOfConsumer; ++i) {
                                    Channel channel = connection.createChannel();
                                    getQueue(channel);
                                    channel.basicQos(this.numberOFThreads);
                                    binQueuedWithExchange(channel);
                                    createConsumer(channel, i + "th mt consumer");
                        }
            }

            public Connection getConnection()  {
                        try{
                                    return this.rabbitMqConnectionFactory.getConnection();
                        }catch (Exception e) {

                        }
                        return null;
            }

            public void getQueue(Channel channel) throws IOException {
                        Map<String, Object> args = new HashMap<String, Object>();
                        args.put("x-ha-policy", "all");
                        channel.queueDeclare(getQueueName(), true, false, false, args);
            }

            public void binQueuedWithExchange(Channel channel) throws IOException {
                        channel.queueBind(getQueueName(), getExchangeName(), getRoutingKey());
            }

            public void createConsumer(final Channel channel, String consumerTag) throws IOException {
                        channel.basicConsume(getQueueName(), false, consumerTag, new DefaultConsumer(channel) {

                                    @Override
                                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                                    throws IOException
                                    {
                                                long deliveryTag = envelope.getDeliveryTag();
                                                            logger.error("handling message=="+new String(body));
                                                            Task task= new Task(new String(body), channel, deliveryTag);
                                                            threadExecutor.execute(task);
                                    }

                                    /**
                                     * No-op implementation of {@link Consumer#handleCancelOk}.
                                     *
                                     * @param consumerTag
                                     *            the defined consumer tag (client- or server-generated)
                                     */
                                    public void handleCancelOk(String consumerTag) {
                                                logger.error("This consumer cancelled ok ---"+consumerTag);
                                    }

                                    /**
                                     * No-op implementation of {@link Consumer#handleCancel(String)}
                                     *
                                     * @param consumerTag
                                     *            the defined consumer tag (client- or server-generated)
                                     */
                                    public void handleCancel(String consumerTag) throws IOException {
                                                logger.error("This consumer cancelled ---"+consumerTag);
                                    }

                                    /**
                                     * No-op implementation of {@link Consumer#handleShutdownSignal}.
                                     */
                                    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                                                logger.error("This consumer shutdown ---"+consumerTag , sig);
                                    }

                                    /**
                                     * No-op implementation of {@link Consumer#handleRecoverOk}.
                                     */
                                    public void handleRecoverOk() {
                                                logger.error("This consumer recoved ok ---");
                                    }
                        });
            }


            public void closeConnection() throws IOException{
                        shutdownFired = true;
                        threadExecutor.shutdown();
                        consumerThread.getConnection().removeShutdownListener(connectionShutDownListener);
                        consumerThread.getConnection().close();
                        th.interrupt();

            }
            private class ConsumerThread implements Runnable {

                        private int numberOfConsumer;

                        private Connection connection;


                        public ConsumerThread(int numberOfConsumer, Connection connection) {
                                    this.numberOfConsumer = numberOfConsumer;
                                    this.connection = connection;
                        }

                        @Override
                        public void run() {
                                    try {
                                                createConsumers(numberOfConsumer, this.connection);
                                    } catch (IOException e) {
                                                System.out.println("This thread is being killed.....");
                                    }
                        }

                        public Connection getConnection() {
                                    return connection;
                        }



            }

            private class ConnectionShutDownListener  implements ShutdownListener {

                        private int numberOfConsumer;
                        public ConnectionShutDownListener(int numberOfConsumer){
                                    this.numberOfConsumer = numberOfConsumer;
                        }
                        @Override
                        public void shutdownCompleted(ShutdownSignalException cause) {
                                    if(!shutdownFired){
                                                logger.error("new thread is being started.....for starting new connection.. connection fluctuates...");
                                                Connection connection = getConnection() ;
                                                while(connection == null ){
                                                            logger.error("got failed connection.....retrying...");
                                                            try {
                                                                        Thread.sleep(100);
                                                            } catch (InterruptedException e) {

                                                            }
                                                            connection = getConnection() ;
                                                }
                                        logger.error("connection successfully gained");
                                        consumerThread= new ConsumerThread(numberOfConsumer, connection);
                                                th= new Thread(consumerThread);
                                                th.start();
                                    }
                        }

            }


            private class Task implements Runnable{

                        String message;

                        Channel channel;

                        long deliveryTag;

                        public Task(String message, Channel channel, long deliveryTag) {
                                    this.message = message;
                                    this.channel = channel;
                                    this.deliveryTag = deliveryTag;
                        }
                        @Override
                        public void run() {
                                    try{
                                                messageHandler.handleMessage(message);
                                    }finally{
                                                try {
                                                            channel.basicAck(deliveryTag, false);
                                                } catch (IOException e) {
                                                            logger.error("Error in sending acknowledge");
                                                }
                                    }
                        }
            }
}





Below is the log for server crash ()



Run farther, run faster, run pain-free. Calling all marathon enthusiasts for a sports evaluation and runners' program at  VARD.N. Write in today to vardan at timesgroup.com or click here to register and we'll schedule an appointment for you.



DISCLAIMER AND PRIVILEGE NOTICE: This e-mail message contains confidential, copyright, proprietary and legally privileged information. It should not be used by anyone who is not the original intended recipient. If you have erroneously received this message, please delete it immediately and notify the sender. The recipient must note and understand that any views expressed in this message are those of the individual sender and no binding nature of the message shall be implied or assumed unless the sender does so expressly with due authority of BCCL subsidiaries and associated companies, Collectively Times Group.

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120829/8f1ddbda/attachment.htm>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: master-sasl.log
Type: application/octet-stream
Size: 30196 bytes
Desc: master-sasl.log
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120829/8f1ddbda/attachment.obj>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: master.log
Type: application/octet-stream
Size: 7284 bytes
Desc: master.log
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120829/8f1ddbda/attachment-0001.obj>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: slave.log
Type: application/octet-stream
Size: 1698 bytes
Desc: slave.log
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120829/8f1ddbda/attachment-0002.obj>


More information about the rabbitmq-discuss mailing list