[rabbitmq-discuss] RabbitMQ Subscriber Timeout When Waiting for Delivery

Deelo dwayne.dsouza at gmail.com
Fri Jun 29 09:22:37 BST 2012


My subscriber appears to timeout and not process any messages if the 
publisher takes a long time to initiate. (e.g. I start the publisher 30 
minutes after the subscriber).
Is there a way to change this?

Appreciate any help as this is causing all kinds of problems!


public class RabbitMqSubscriber<T extends Serializable> implements 
Subscriber<T> {
private static final Logger log = 
LoggerFactory.getLogger(RabbitMqSubscriber.class);
private QueueingConsumer consumer;
private MessageListener<T> listener;
private String exchange;
private String topic;
public RabbitMqSubscriber(String host,String exchange,String topic) throws 
IOException {
this.exchange=exchange;
this.topic=topic;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, topic);
consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
}

public void run() {
log.info("Starting subscribing to messages on exchange: "+exchange + " with 
topic: "+topic);
while (true) {
QueueingConsumer.Delivery delivery;
try {
delivery = consumer.nextDelivery();
Object o=SerializationUtils.deserialize(delivery.getBody());
log.info("Subscriber received object: "+o);
listener.receive((T)o);
} catch (ShutdownSignalException | ConsumerCancelledException | 
InterruptedException e) {
throw new RuntimeException(e);
}
}
}

@Override
public void setListener(MessageListener<T> listener) {
this.listener=listener;
}
}

public class RabbitMqPublisher<T extends Serializable> implements 
Publisher<T>{
private String topic;
private Channel channel;
private String exchange;
private static final Logger log = 
LoggerFactory.getLogger(RabbitMqPublisher.class);
public RabbitMqPublisher(String host,String exchange,String topic) throws 
IOException {
super();
this.topic = topic;
this.exchange=exchange;
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    Connection connection = factory.newConnection();
    channel = connection.createChannel();
        channel.exchangeDeclare(exchange, "topic");
log.info("Publishing to messages on exchange: "+exchange + " with topic: 
"+topic);
}
@Override
public void publish(T t) {
    try {
     log.info("Publishing object: "+t);
        channel.basicPublish(exchange, topic, null, 
SerializationUtils.serialize(t));
} catch (IOException e) {
throw new RuntimeException(e);
}

}
}

public class TimeoutTest {
private static final Logger log = 
LoggerFactory.getLogger(TimeoutTest.class);
public static void main(String[] args) throws Exception{
final RabbitMqPublisher<String> publisher=new 
RabbitMqPublisher<String>("31.222.170.242","EXCHANGE","TEST");
final RabbitMqSubscriber<String> subscriber=new 
RabbitMqSubscriber<String>("31.222.170.242","EXCHANGE", "TEST");
subscriber.setListener(new MessageListener<String>() {
 @Override
public void receive(String s) {
System.out.println("Received: "+s); //NEVER GETS CALLED!!!
 }
});
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
subscriber.run();
}
});
 //start subscriber then wait 30 minutes before publishing first message 
Thread.sleep(1000*1800);
 publisher.publish("MESSAGE PUBLISHED");
}
}

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120629/f564f260/attachment.htm>


More information about the rabbitmq-discuss mailing list