[rabbitmq-discuss] Problem with BasicProperties (bug?)
radisb
radisb at gmail.com
Sat Apr 24 17:47:45 BST 2010
he code below does the following: Sends 1000 messages (each tagged with a key
by BasicProperties.getHeaders.put("MESSAGE_KEY", value) and stored in a map)
directly to queueA and then waits in a loop for incoming replies on the
queue Replies.
It then fires 2 Threads (SimpleForwarder class). The first thread waits for
messages on QueueA and upon receipt it sends them to QueueB. The second
Thread waits on QueueB and sends to Replies.
When a message is received on Replies , its key (taken from the header in
the delivery properties) is removed from a map. The test finishes
succesfully if all keys sent where received and removed from the map.
The problem is in the forwarding threads code. If before forwarding the
message, the thread recreates the BasicProperties and sends a fresh
BasicProperties , this causes the consumer on Replies to lose messages and
get duplicates in their place. Sometimes the forwarder throws null pointer
exception because it cant find the header that has the key(This cant happen
because all messages are assigned one before sending). What is strange is
that if the 2 forwarders are started in separate JVMs the problem
disappears. I have marked the problematic area with /* -- PROBLEM -- */
Note that the lost messages are random and the problem might not manifest if
run for a few messages. Above 400-500 it always happens for me.
I use server 1.7.2 and javaclient 1.7.2.
Any suggestions?
Here is the code:
http://old.nabble.com/file/p28351417/Tester.java Tester.java
package test;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.LongString;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
public class Tester
{
ConnectionParameters parameters;
Channel senderChannel;
Channel receiverChannel;
QueueingConsumer consumer;
String sourceQueue;
String targetRoutingKey;
public Tester(ConnectionParameters parameters, String sourceQueue, String
targetRoutingKey)
{
this.parameters = parameters;
this.sourceQueue = sourceQueue;
this.targetRoutingKey = targetRoutingKey;
}
public void test(int messageCount) throws IOException
{
senderChannel = new
ConnectionFactory(parameters).newConnection("localhost").createChannel();
receiverChannel = new
ConnectionFactory(parameters).newConnection("localhost").createChannel();
receiverChannel.queueDeclare(sourceQueue, false);
receiverChannel.queueBind(sourceQueue, "amq.direct", sourceQueue);
consumer = new QueueingConsumer(receiverChannel);
String dummyData = "dummyData";
Set<String>sentKeys = new HashSet<String>();
int c = 0;
while(c < messageCount)
{
c++;
String key = String.valueOf(c);
BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
props.setHeaders(new HashMap<String, Object>());
props.getHeaders().put("MESSAGE_KEY", key);
senderChannel.basicPublish("amq.direct", targetRoutingKey, props,
dummyData.getBytes());
sentKeys.add(key);
}
System.out.println("Sent " + c + " messages. Firing up consumer to wait
for replies:\n");
receiverChannel.basicConsume(sourceQueue, false, consumer);
while(true)
{
try
{
Delivery delivery = consumer.nextDelivery(1000);
if(delivery != null)
{
receiverChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
String receivedKey = new
String(((LongString)delivery.getProperties().getHeaders().get("MESSAGE_KEY")).getBytes());
System.out.print("Key: " + receivedKey);
if(!sentKeys.remove(receivedKey))
System.out.println(" ...Already received");
else
{
System.out.println(" ...OK");
if(sentKeys.isEmpty())
{
System.out.println("All keys removed");
close();
return;
}
}
}
else
{
System.out.println("Timed out.");
}
}
catch(InterruptedException ex){System.err.println("Interrupted while
waiting for message. Exiting");return;}
catch(ShutdownSignalException sse){ System.err.println("Caught shutdown
signal. Exiting");return;}
catch(IOException ex){System.err.println("Unexpected IO error.
Exiting");}
}
}
private void close() throws IOException
{
senderChannel.getConnection().close();
receiverChannel.getConnection().close();
}
public static class SimpleForwarder extends Thread
{
ConnectionParameters parameters;
Channel senderChannel;
Channel receiverChannel;
QueueingConsumer consumer;
String sourceQueue;
String targetRoutingKey;
public SimpleForwarder(ConnectionParameters parameters, String
sourceQueue, String targetRoutingKey)
{
this.parameters = parameters;
this.sourceQueue = sourceQueue;
this.targetRoutingKey = targetRoutingKey;
this.setName(sourceQueue);
}
private void init() throws IOException
{
senderChannel = new
ConnectionFactory(parameters).newConnection("localhost").createChannel();
receiverChannel = new
ConnectionFactory(parameters).newConnection("localhost").createChannel();
receiverChannel.basicQos(1);
receiverChannel.queueDeclare(sourceQueue, false);
receiverChannel.queueBind(sourceQueue, "amq.direct", sourceQueue);
consumer = new QueueingConsumer(receiverChannel);
receiverChannel.basicConsume(sourceQueue, false, consumer);
}
@Override
public void run()
{
try{init();}catch(IOException ex){System.err.println("Error initializing.
Exiting.");close();return;}
while(!Thread.currentThread().isInterrupted())
{
try
{
Delivery delivery = consumer.nextDelivery(5000);
if(delivery != null)
{
String key = new
String(((LongString)delivery.getProperties().getHeaders().get("MESSAGE_KEY")).getBytes());
String data = new String(delivery.getBody());
/* -- PROBLEM -- */
//--------- THIS CAUSES the problem. (BUT ONLY WHEN SimpleForwarders are
running IN THE SAME JVM).
BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
//--------- THIS NEVER CAUSES PROBLEM EVEN when SimpleForwarders IN THE
SAME JVM
// BasicProperties props = delivery.getProperties();
/* -- END OF PROBLEM -- */
props.setHeaders(new HashMap<String, Object>());
props.getHeaders().put("MESSAGE_KEY", key);
senderChannel.basicPublish("amq.direct", targetRoutingKey, props ,
data.getBytes());
receiverChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
catch(InterruptedException ex){System.err.println("[" +
Thread.currentThread().getName() + "] Interrupted while waiting for message.
Exiting");close();return;}
catch(ShutdownSignalException sse){ System.err.println("[" +
Thread.currentThread().getName() + "] Caught shutdown signal.
Exiting");close();return;}
catch(IOException ex){System.err.println("[" +
Thread.currentThread().getName() + "] Unexpected IO error.
Exiting");return;}
}
}
public void close()
{
try
{
senderChannel.getConnection().close();
receiverChannel.getConnection().close();
}
catch(IOException ex)
{
System.err.println("Close failed");
}
}
}
public static void main(String[] args) throws IOException
{
ConnectionParameters params = new ConnectionParameters();
params.setPassword("guest");
params.setUsername("guest");
params.setVirtualHost("/");
SimpleForwarder f1 = new SimpleForwarder(params, "QueueA", "QueueB");
SimpleForwarder f2 = new SimpleForwarder(params, "QueueB", "Replies");
f1.start();
f2.start();
Tester tester = new Tester(params, "Replies", "QueueA");
tester.test(1000);
}
}
--
View this message in context: http://old.nabble.com/Problem-with-BasicProperties-%28bug-%29-tp28351417p28351417.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list