[rabbitmq-discuss] amqp_channel_open issue

Matthew Sackman matthew at lshift.net
Thu Feb 11 16:07:33 GMT 2010


Hi,

I'm afraid the RabbitMQ-C client is highly experimental and not really
receiving much development attention at the moment. We would not
recommend it for production use - it has not been through our QA
processes. I'm fairly sure that you need to make sure that you don't use
a channel from more than one thread, but I have a feeling there are even
more constraints than that. I'm afraid, if in doubt, don't use the C
client, at the moment.

Matthew

On Wed, Feb 10, 2010 at 05:41:49PM +0000, raghu a wrote:
> Hi,
> 
> I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the
> broker.
> We are playing with the examples(with few modifications) given by RabbitMQ.
> We created one producer and one consumer with the same connection but
> difference channels.
> In the following code amqp_channel_open(conn, 2) statement is causing the my
> main thread to block
> but if i move this statement before creating of consumer thread then every
> thing seems to be working.
> 
> My code is as follows.
> static void send_batch(amqp_connection_
> state_t conn,
>                        char const *queue_name,
>                        int rate_limit,
>                        int message_count)
> {
>   char message[256] = "test";
>   die_on_error(amqp_basic_publish(conn,
>                                     2,
>                                     amqp_cstring_bytes("amq.direct"),
>                                     amqp_cstring_bytes(queue_name),
>                                     0,
>                                     0,
>                                     NULL,
>                                     (amqp_bytes_t) {.len = sizeof(message),
> .bytes = message}),
>                  "Publishing");
>     printf("Message sent by producer\n");
> }
> 
> static void run(amqp_connection_state_t conn)
> {
>   amqp_frame_t frame;
>   int result;
>   size_t body_received;
>   size_t body_target;
>   int  received = 0;
>   while (1) {
>     amqp_maybe_release_buffers(conn);
>     result = amqp_simple_wait_frame(conn, &frame);
>     if (result <= 0) return;
>     if (frame.frame_type != AMQP_FRAME_METHOD)
>       continue;
>     if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
>       continue;
>     result = amqp_simple_wait_frame(conn, &frame);
>     if (result <= 0) return;
>     if (frame.frame_type != AMQP_FRAME_HEADER) {
>       abort();
>     }
>     body_target = frame.payload.properties.body_size;
>     body_received = 0;
>     while (body_received < body_target) {
>       result = amqp_simple_wait_frame(conn, &frame);
>       if (result <= 0) return;
>       if (frame.frame_type != AMQP_FRAME_BODY) {
>     abort();
>       }
>       body_received += frame.payload.body_fragment.len;
>       assert(body_received <= body_target);
>     }
>     received++;
>     printf("received=%d\n",received);
>   }
> }
> void*  consume(void* conn){
>   run(*((amqp_connection_state_t*)conn));
> }
> int main(int argc, char const * const *argv) {
>   char const *hostname="127.0.0.1";
>   int port="5672";
>   char const *exchange;
>   char const *bindingkey;
>   int sockfd;
>   amqp_connection_state_t conn;
>   amqp_bytes_t queuename;
>   exchange = "amq.direct"; //argv[3];
>   bindingkey = "test queue"; //argv[4];
>   int rate_limit = 100;
>   int message_count = 10000;
>   conn = amqp_new_connection();
>   die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
>   amqp_set_sockfd(conn, sockfd);
>   die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0,
> AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
>             "Logging in");
>   amqp_channel_open(conn, 1);
>   die_on_amqp_error(amqp_rpc_reply, "Opening channel");
>   {
>     amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,
> AMQP_EMPTY_BYTES, 0, 0, 0, 1,
>                             AMQP_EMPTY_TABLE);
>     die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
>     queuename = amqp_bytes_malloc_dup(r->queue);
>     if (queuename.bytes == NULL) {
>       die_on_error(-ENOMEM, "Copying queue name");
>     }
>   }
>   amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
> amqp_cstring_bytes(bindingkey),
>           AMQP_EMPTY_TABLE);
>   die_on_amqp_error(amqp_rpc_reply, "Binding queue");
> 
>   amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);
>   die_on_amqp_error(amqp_rpc_reply, "Consuming");
>   pthread_t threadId;
>   pthread_create(&threadId,NULL,consume,(void*)&conn);
>   sleep(1);
>   amqp_channel_open(conn, 2);
>   send_batch(conn, "test queue", rate_limit, message_count);
>   die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
> "Closing channel");
>   die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
> "Closing connection");
>   amqp_destroy_connection(conn);
>   die_on_error(close(sockfd), "Closing socket");
>   return 0;
> }
> 
> Please provide me the reasons for blocking.
> 
> Thanks,
> Ragavendra.

> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss





More information about the rabbitmq-discuss mailing list