[rabbitmq-discuss] Publishing binary data using the java client

Christian Strack strack at Mathematik.Uni-Marburg.de
Sat Jan 12 17:44:16 GMT 2013


Hello Matthias and Michael and thank you for your fast response. In this 
mail is the full code including de- and encoding:

package de.bench.message.transport;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;

public class AMQPTransport extends Transport{

     Connection connection;
     Channel channel;
     String exchange;
     QueueingConsumer consumer;
     int role;

     public AMQPTransport() {
         // TODO Auto-generated constructor stub
     }

     @Override
     public void initialise(String info) {
         String[] params = info.split("/");
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost(params[0]);
         factory.setUsername(params[1]);
         factory.setPassword(params[2]);
         exchange = params[3];
         role = Integer.parseInt(params[4]);


         try {
             connection = factory.newConnection();
             channel = connection.createChannel();
             channel.exchangeDeclare(exchange, "fanout");

             if(role == 1){
                 String queue = channel.queueDeclare().getQueue();
                 channel.queueBind(queue, exchange, "");
                 consumer = new QueueingConsumer(channel);
                 channel.basicConsume(queue, true, consumer);
             }
         } catch (IOException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }


     }

     @Override
     public int write(byte[] sendData, int len) {
         try {
             channel.basicPublish(exchange, "", false, 
MessageProperties.BASIC, sendData);
         } catch (IOException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
         return 0;
     }

     @Override
     public int read(byte[] data, int len) {
         try {
             Delivery delivery = consumer.nextDelivery();
             if (delivery != null){
                 data = delivery.getBody();
                 System.out.println(new String(data));
                 return len;
             }
             return 0;
         } catch (ShutdownSignalException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
             return -1;
         } catch (ConsumerCancelledException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
             return -2;
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
             return -3;
         }
     }

     @Override
     public void close() {
         try {
             channel.close();
             connection.close();
         } catch (IOException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }


     }

}

Sender:
     public void benchmarkPackageLossSender(Transport t, int count) 
throws IOException{
         bwLogWriter.write("Benchmark Package Loss As Sender");
         bwLogWriter.newLine();
         bwLogWriter.write("Count: "+count);
         bwLogWriter.newLine();
         bwLogWriter.newLine();

         for(int i=0; i<count;i++){
             ByteBuffer bb = ByteBuffer.allocate(4);
             byte[] data = bb.putInt(i).array();
             System.out.println("Sent package #"+i);
             t.write(data, data.length);
         }
         bwLogWriter.write("Benchmark finished");
         bwLogWriter.newLine();
     }

Receiver
     public void benchmarkPackageLossReceiver(Transport t, int count) 
throws IOException{
         bwLogWriter.write("Benchmark Package Loss As Receiver");
         bwLogWriter.newLine();
         bwLogWriter.write("Count: "+count);
         bwLogWriter.newLine();
         bwLogWriter.newLine();

         System.out.println("Begin Benchmark");

         byte[] data = new byte[4];
         int last_value = -1;
         int new_value = -1;

         for(int i=0; i<count;i++){
             t.read(data, 4);
             System.out.println("Received Package #"+i);
             new_value = ByteBuffer.wrap(data).getInt();
             // Here only random bytes are printed
             System.out.println("Content: "+new_value);
             if(new_value != ++last_value){
                 bwLogWriter.write("Message Loss! Last Message: 
"+last_value+"\t New Message: "+new_value);
                 bwLogWriter.newLine();
             }
             last_value = new_value;
         }
         System.out.println("Benchmark finished");
         bwLogWriter.write("Benchmark finished");
         bwLogWriter.newLine();
     }

On 01/12/2013 06:04 PM, Matthias Radestock wrote:
> Christian,
>
> On 12/01/13 14:41, Christian Strack wrote:
>> for benchmarking purposes i have set up a an astract testing suite to
>> transfer data using several transport implementations. The transmitted
>> data are integers converted to byte arrays using ByteBuffer. While the
>> en- and decoding of these data is successful using UDP sockets, the
>> decoding fails when being transmitted using rabbitmq and the rabbitmq
>> java libraries. The results are just random bytes but the array size is
>> correct. I have tried to set the encoding to different BasicProperties
>> which had no impact. Is there a flag that needs to be set to prevent
>> that rabbitmq alters the data? Below are the important snippets of my 
>> code.
>
> The RabbitMQ server does not touch the message body. So this is either 
> a problem with your code or, far less likely, the client library.
>
> I cannot see anything obviously wrong with the snippet you sent, but 
> since it's only a snippet the error is probably elsewhere. So please 
> submit a complete, self-contained program that publishes and consumes 
> a single byte[] message and exhibits the problem.
>
> Regards,
>
> Matthias.
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss



More information about the rabbitmq-discuss mailing list