[rabbitmq-discuss] amqp_channel_open issue
raghu a
a.raghu001 at gmail.com
Fri Feb 12 13:48:40 GMT 2010
Thanks for your response.
As per my code each thread is using different channel.
May I get any multi threaded C client samples?
Could you please let me know the constraints also?
thanks,
Ragavendra.
On Thu, Feb 11, 2010 at 9:37 PM, Matthew Sackman <matthew at lshift.net> wrote:
> Hi,
>
> I'm afraid the RabbitMQ-C client is highly experimental and not really
> receiving much development attention at the moment. We would not
> recommend it for production use - it has not been through our QA
> processes. I'm fairly sure that you need to make sure that you don't use
> a channel from more than one thread, but I have a feeling there are even
> more constraints than that. I'm afraid, if in doubt, don't use the C
> client, at the moment.
>
> Matthew
>
> On Wed, Feb 10, 2010 at 05:41:49PM +0000, raghu a wrote:
> > Hi,
> >
> > I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the
> > broker.
> > We are playing with the examples(with few modifications) given by
> RabbitMQ.
> > We created one producer and one consumer with the same connection but
> > difference channels.
> > In the following code amqp_channel_open(conn, 2) statement is causing the
> my
> > main thread to block
> > but if i move this statement before creating of consumer thread then
> every
> > thing seems to be working.
> >
> > My code is as follows.
> > static void send_batch(amqp_connection_
> > state_t conn,
> > char const *queue_name,
> > int rate_limit,
> > int message_count)
> > {
> > char message[256] = "test";
> > die_on_error(amqp_basic_publish(conn,
> > 2,
> > amqp_cstring_bytes("amq.direct"),
> > amqp_cstring_bytes(queue_name),
> > 0,
> > 0,
> > NULL,
> > (amqp_bytes_t) {.len =
> sizeof(message),
> > .bytes = message}),
> > "Publishing");
> > printf("Message sent by producer\n");
> > }
> >
> > static void run(amqp_connection_state_t conn)
> > {
> > amqp_frame_t frame;
> > int result;
> > size_t body_received;
> > size_t body_target;
> > int received = 0;
> > while (1) {
> > amqp_maybe_release_buffers(conn);
> > result = amqp_simple_wait_frame(conn, &frame);
> > if (result <= 0) return;
> > if (frame.frame_type != AMQP_FRAME_METHOD)
> > continue;
> > if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
> > continue;
> > result = amqp_simple_wait_frame(conn, &frame);
> > if (result <= 0) return;
> > if (frame.frame_type != AMQP_FRAME_HEADER) {
> > abort();
> > }
> > body_target = frame.payload.properties.body_size;
> > body_received = 0;
> > while (body_received < body_target) {
> > result = amqp_simple_wait_frame(conn, &frame);
> > if (result <= 0) return;
> > if (frame.frame_type != AMQP_FRAME_BODY) {
> > abort();
> > }
> > body_received += frame.payload.body_fragment.len;
> > assert(body_received <= body_target);
> > }
> > received++;
> > printf("received=%d\n",received);
> > }
> > }
> > void* consume(void* conn){
> > run(*((amqp_connection_state_t*)conn));
> > }
> > int main(int argc, char const * const *argv) {
> > char const *hostname="127.0.0.1";
> > int port="5672";
> > char const *exchange;
> > char const *bindingkey;
> > int sockfd;
> > amqp_connection_state_t conn;
> > amqp_bytes_t queuename;
> > exchange = "amq.direct"; //argv[3];
> > bindingkey = "test queue"; //argv[4];
> > int rate_limit = 100;
> > int message_count = 10000;
> > conn = amqp_new_connection();
> > die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening
> socket");
> > amqp_set_sockfd(conn, sockfd);
> > die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0,
> > AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
> > "Logging in");
> > amqp_channel_open(conn, 1);
> > die_on_amqp_error(amqp_rpc_reply, "Opening channel");
> > {
> > amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,
> > AMQP_EMPTY_BYTES, 0, 0, 0, 1,
> > AMQP_EMPTY_TABLE);
> > die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
> > queuename = amqp_bytes_malloc_dup(r->queue);
> > if (queuename.bytes == NULL) {
> > die_on_error(-ENOMEM, "Copying queue name");
> > }
> > }
> > amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
> > amqp_cstring_bytes(bindingkey),
> > AMQP_EMPTY_TABLE);
> > die_on_amqp_error(amqp_rpc_reply, "Binding queue");
> >
> > amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);
> > die_on_amqp_error(amqp_rpc_reply, "Consuming");
> > pthread_t threadId;
> > pthread_create(&threadId,NULL,consume,(void*)&conn);
> > sleep(1);
> > amqp_channel_open(conn, 2);
> > send_batch(conn, "test queue", rate_limit, message_count);
> > die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
> > "Closing channel");
> > die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
> > "Closing connection");
> > amqp_destroy_connection(conn);
> > die_on_error(close(sockfd), "Closing socket");
> > return 0;
> > }
> >
> > Please provide me the reasons for blocking.
> >
> > Thanks,
> > Ragavendra.
>
> > _______________________________________________
> > rabbitmq-discuss mailing list
> > rabbitmq-discuss at lists.rabbitmq.com
> > http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100212/220ff64a/attachment.htm
More information about the rabbitmq-discuss
mailing list