[rabbitmq-discuss] amqp_channel_open issue

raghu a a.raghu001 at gmail.com
Fri Feb 12 13:48:40 GMT 2010


Thanks for your response.
As per my code each thread is using different channel.
May I get any  multi threaded C client samples?
Could you please let me know the constraints also?

thanks,
Ragavendra.


On Thu, Feb 11, 2010 at 9:37 PM, Matthew Sackman <matthew at lshift.net> wrote:

> Hi,
>
> I'm afraid the RabbitMQ-C client is highly experimental and not really
> receiving much development attention at the moment. We would not
> recommend it for production use - it has not been through our QA
> processes. I'm fairly sure that you need to make sure that you don't use
> a channel from more than one thread, but I have a feeling there are even
> more constraints than that. I'm afraid, if in doubt, don't use the C
> client, at the moment.
>
> Matthew
>
> On Wed, Feb 10, 2010 at 05:41:49PM +0000, raghu a wrote:
> > Hi,
> >
> > I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the
> > broker.
> > We are playing with the examples(with few modifications) given by
> RabbitMQ.
> > We created one producer and one consumer with the same connection but
> > difference channels.
> > In the following code amqp_channel_open(conn, 2) statement is causing the
> my
> > main thread to block
> > but if i move this statement before creating of consumer thread then
> every
> > thing seems to be working.
> >
> > My code is as follows.
> > static void send_batch(amqp_connection_
> > state_t conn,
> >                        char const *queue_name,
> >                        int rate_limit,
> >                        int message_count)
> > {
> >   char message[256] = "test";
> >   die_on_error(amqp_basic_publish(conn,
> >                                     2,
> >                                     amqp_cstring_bytes("amq.direct"),
> >                                     amqp_cstring_bytes(queue_name),
> >                                     0,
> >                                     0,
> >                                     NULL,
> >                                     (amqp_bytes_t) {.len =
> sizeof(message),
> > .bytes = message}),
> >                  "Publishing");
> >     printf("Message sent by producer\n");
> > }
> >
> > static void run(amqp_connection_state_t conn)
> > {
> >   amqp_frame_t frame;
> >   int result;
> >   size_t body_received;
> >   size_t body_target;
> >   int  received = 0;
> >   while (1) {
> >     amqp_maybe_release_buffers(conn);
> >     result = amqp_simple_wait_frame(conn, &frame);
> >     if (result <= 0) return;
> >     if (frame.frame_type != AMQP_FRAME_METHOD)
> >       continue;
> >     if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
> >       continue;
> >     result = amqp_simple_wait_frame(conn, &frame);
> >     if (result <= 0) return;
> >     if (frame.frame_type != AMQP_FRAME_HEADER) {
> >       abort();
> >     }
> >     body_target = frame.payload.properties.body_size;
> >     body_received = 0;
> >     while (body_received < body_target) {
> >       result = amqp_simple_wait_frame(conn, &frame);
> >       if (result <= 0) return;
> >       if (frame.frame_type != AMQP_FRAME_BODY) {
> >     abort();
> >       }
> >       body_received += frame.payload.body_fragment.len;
> >       assert(body_received <= body_target);
> >     }
> >     received++;
> >     printf("received=%d\n",received);
> >   }
> > }
> > void*  consume(void* conn){
> >   run(*((amqp_connection_state_t*)conn));
> > }
> > int main(int argc, char const * const *argv) {
> >   char const *hostname="127.0.0.1";
> >   int port="5672";
> >   char const *exchange;
> >   char const *bindingkey;
> >   int sockfd;
> >   amqp_connection_state_t conn;
> >   amqp_bytes_t queuename;
> >   exchange = "amq.direct"; //argv[3];
> >   bindingkey = "test queue"; //argv[4];
> >   int rate_limit = 100;
> >   int message_count = 10000;
> >   conn = amqp_new_connection();
> >   die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening
> socket");
> >   amqp_set_sockfd(conn, sockfd);
> >   die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0,
> > AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
> >             "Logging in");
> >   amqp_channel_open(conn, 1);
> >   die_on_amqp_error(amqp_rpc_reply, "Opening channel");
> >   {
> >     amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,
> > AMQP_EMPTY_BYTES, 0, 0, 0, 1,
> >                             AMQP_EMPTY_TABLE);
> >     die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
> >     queuename = amqp_bytes_malloc_dup(r->queue);
> >     if (queuename.bytes == NULL) {
> >       die_on_error(-ENOMEM, "Copying queue name");
> >     }
> >   }
> >   amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
> > amqp_cstring_bytes(bindingkey),
> >           AMQP_EMPTY_TABLE);
> >   die_on_amqp_error(amqp_rpc_reply, "Binding queue");
> >
> >   amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);
> >   die_on_amqp_error(amqp_rpc_reply, "Consuming");
> >   pthread_t threadId;
> >   pthread_create(&threadId,NULL,consume,(void*)&conn);
> >   sleep(1);
> >   amqp_channel_open(conn, 2);
> >   send_batch(conn, "test queue", rate_limit, message_count);
> >   die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
> > "Closing channel");
> >   die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
> > "Closing connection");
> >   amqp_destroy_connection(conn);
> >   die_on_error(close(sockfd), "Closing socket");
> >   return 0;
> > }
> >
> > Please provide me the reasons for blocking.
> >
> > Thanks,
> > Ragavendra.
>
> > _______________________________________________
> > rabbitmq-discuss mailing list
> > rabbitmq-discuss at lists.rabbitmq.com
> > http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100212/220ff64a/attachment.htm 


More information about the rabbitmq-discuss mailing list