[rabbitmq-discuss] Publishing binary data using the java client
Christian Strack
strack at Mathematik.Uni-Marburg.de
Sat Jan 12 17:44:16 GMT 2013
Hello Matthias and Michael and thank you for your fast response. In this
mail is the full code including de- and encoding:
package de.bench.message.transport;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class AMQPTransport extends Transport{
Connection connection;
Channel channel;
String exchange;
QueueingConsumer consumer;
int role;
public AMQPTransport() {
// TODO Auto-generated constructor stub
}
@Override
public void initialise(String info) {
String[] params = info.split("/");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(params[0]);
factory.setUsername(params[1]);
factory.setPassword(params[2]);
exchange = params[3];
role = Integer.parseInt(params[4]);
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
if(role == 1){
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, exchange, "");
consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, true, consumer);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public int write(byte[] sendData, int len) {
try {
channel.basicPublish(exchange, "", false,
MessageProperties.BASIC, sendData);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return 0;
}
@Override
public int read(byte[] data, int len) {
try {
Delivery delivery = consumer.nextDelivery();
if (delivery != null){
data = delivery.getBody();
System.out.println(new String(data));
return len;
}
return 0;
} catch (ShutdownSignalException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return -1;
} catch (ConsumerCancelledException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return -2;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return -3;
}
}
@Override
public void close() {
try {
channel.close();
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Sender:
public void benchmarkPackageLossSender(Transport t, int count)
throws IOException{
bwLogWriter.write("Benchmark Package Loss As Sender");
bwLogWriter.newLine();
bwLogWriter.write("Count: "+count);
bwLogWriter.newLine();
bwLogWriter.newLine();
for(int i=0; i<count;i++){
ByteBuffer bb = ByteBuffer.allocate(4);
byte[] data = bb.putInt(i).array();
System.out.println("Sent package #"+i);
t.write(data, data.length);
}
bwLogWriter.write("Benchmark finished");
bwLogWriter.newLine();
}
Receiver
public void benchmarkPackageLossReceiver(Transport t, int count)
throws IOException{
bwLogWriter.write("Benchmark Package Loss As Receiver");
bwLogWriter.newLine();
bwLogWriter.write("Count: "+count);
bwLogWriter.newLine();
bwLogWriter.newLine();
System.out.println("Begin Benchmark");
byte[] data = new byte[4];
int last_value = -1;
int new_value = -1;
for(int i=0; i<count;i++){
t.read(data, 4);
System.out.println("Received Package #"+i);
new_value = ByteBuffer.wrap(data).getInt();
// Here only random bytes are printed
System.out.println("Content: "+new_value);
if(new_value != ++last_value){
bwLogWriter.write("Message Loss! Last Message:
"+last_value+"\t New Message: "+new_value);
bwLogWriter.newLine();
}
last_value = new_value;
}
System.out.println("Benchmark finished");
bwLogWriter.write("Benchmark finished");
bwLogWriter.newLine();
}
On 01/12/2013 06:04 PM, Matthias Radestock wrote:
> Christian,
>
> On 12/01/13 14:41, Christian Strack wrote:
>> for benchmarking purposes i have set up a an astract testing suite to
>> transfer data using several transport implementations. The transmitted
>> data are integers converted to byte arrays using ByteBuffer. While the
>> en- and decoding of these data is successful using UDP sockets, the
>> decoding fails when being transmitted using rabbitmq and the rabbitmq
>> java libraries. The results are just random bytes but the array size is
>> correct. I have tried to set the encoding to different BasicProperties
>> which had no impact. Is there a flag that needs to be set to prevent
>> that rabbitmq alters the data? Below are the important snippets of my
>> code.
>
> The RabbitMQ server does not touch the message body. So this is either
> a problem with your code or, far less likely, the client library.
>
> I cannot see anything obviously wrong with the snippet you sent, but
> since it's only a snippet the error is probably elsewhere. So please
> submit a complete, self-contained program that publishes and consumes
> a single byte[] message and exhibits the problem.
>
> Regards,
>
> Matthias.
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
More information about the rabbitmq-discuss
mailing list