Thanks for your response.<br>As per my code each thread is using different channel.<br>May I get any multi threaded C client samples?<br>Could you please let me know the constraints also?<br><br>thanks,<br>Ragavendra.<br>
<br><br><div class="gmail_quote">On Thu, Feb 11, 2010 at 9:37 PM, Matthew Sackman <span dir="ltr"><<a href="mailto:matthew@lshift.net">matthew@lshift.net</a>></span> wrote:<br><blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
Hi,<br>
<br>
I'm afraid the RabbitMQ-C client is highly experimental and not really<br>
receiving much development attention at the moment. We would not<br>
recommend it for production use - it has not been through our QA<br>
processes. I'm fairly sure that you need to make sure that you don't use<br>
a channel from more than one thread, but I have a feeling there are even<br>
more constraints than that. I'm afraid, if in doubt, don't use the C<br>
client, at the moment.<br>
<br>
Matthew<br>
<div><div></div><div class="h5"><br>
On Wed, Feb 10, 2010 at 05:41:49PM +0000, raghu a wrote:<br>
> Hi,<br>
><br>
> I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the<br>
> broker.<br>
> We are playing with the examples(with few modifications) given by RabbitMQ.<br>
> We created one producer and one consumer with the same connection but<br>
> difference channels.<br>
> In the following code amqp_channel_open(conn, 2) statement is causing the my<br>
> main thread to block<br>
> but if i move this statement before creating of consumer thread then every<br>
> thing seems to be working.<br>
><br>
> My code is as follows.<br>
> static void send_batch(amqp_connection_<br>
> state_t conn,<br>
> char const *queue_name,<br>
> int rate_limit,<br>
> int message_count)<br>
> {<br>
> char message[256] = "test";<br>
> die_on_error(amqp_basic_publish(conn,<br>
> 2,<br>
> amqp_cstring_bytes("amq.direct"),<br>
> amqp_cstring_bytes(queue_name),<br>
> 0,<br>
> 0,<br>
> NULL,<br>
> (amqp_bytes_t) {.len = sizeof(message),<br>
> .bytes = message}),<br>
> "Publishing");<br>
> printf("Message sent by producer\n");<br>
> }<br>
><br>
> static void run(amqp_connection_state_t conn)<br>
> {<br>
> amqp_frame_t frame;<br>
> int result;<br>
> size_t body_received;<br>
> size_t body_target;<br>
> int received = 0;<br>
> while (1) {<br>
> amqp_maybe_release_buffers(conn);<br>
> result = amqp_simple_wait_frame(conn, &frame);<br>
> if (result <= 0) return;<br>
> if (frame.frame_type != AMQP_FRAME_METHOD)<br>
> continue;<br>
> if (<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a> != AMQP_BASIC_DELIVER_METHOD)<br>
> continue;<br>
> result = amqp_simple_wait_frame(conn, &frame);<br>
> if (result <= 0) return;<br>
> if (frame.frame_type != AMQP_FRAME_HEADER) {<br>
> abort();<br>
> }<br>
> body_target = frame.payload.properties.body_size;<br>
> body_received = 0;<br>
> while (body_received < body_target) {<br>
> result = amqp_simple_wait_frame(conn, &frame);<br>
> if (result <= 0) return;<br>
> if (frame.frame_type != AMQP_FRAME_BODY) {<br>
> abort();<br>
> }<br>
> body_received += frame.payload.body_fragment.len;<br>
> assert(body_received <= body_target);<br>
> }<br>
> received++;<br>
> printf("received=%d\n",received);<br>
> }<br>
> }<br>
> void* consume(void* conn){<br>
> run(*((amqp_connection_state_t*)conn));<br>
> }<br>
> int main(int argc, char const * const *argv) {<br>
> char const *hostname="127.0.0.1";<br>
> int port="5672";<br>
> char const *exchange;<br>
> char const *bindingkey;<br>
> int sockfd;<br>
> amqp_connection_state_t conn;<br>
> amqp_bytes_t queuename;<br>
> exchange = "amq.direct"; //argv[3];<br>
> bindingkey = "test queue"; //argv[4];<br>
> int rate_limit = 100;<br>
> int message_count = 10000;<br>
> conn = amqp_new_connection();<br>
> die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");<br>
> amqp_set_sockfd(conn, sockfd);<br>
> die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0,<br>
> AMQP_SASL_METHOD_PLAIN, "guest", "guest"),<br>
> "Logging in");<br>
> amqp_channel_open(conn, 1);<br>
> die_on_amqp_error(amqp_rpc_reply, "Opening channel");<br>
> {<br>
> amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,<br>
> AMQP_EMPTY_BYTES, 0, 0, 0, 1,<br>
> AMQP_EMPTY_TABLE);<br>
> die_on_amqp_error(amqp_rpc_reply, "Declaring queue");<br>
> queuename = amqp_bytes_malloc_dup(r->queue);<br>
> if (queuename.bytes == NULL) {<br>
> die_on_error(-ENOMEM, "Copying queue name");<br>
> }<br>
> }<br>
> amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),<br>
> amqp_cstring_bytes(bindingkey),<br>
> AMQP_EMPTY_TABLE);<br>
> die_on_amqp_error(amqp_rpc_reply, "Binding queue");<br>
><br>
> amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);<br>
> die_on_amqp_error(amqp_rpc_reply, "Consuming");<br>
> pthread_t threadId;<br>
> pthread_create(&threadId,NULL,consume,(void*)&conn);<br>
> sleep(1);<br>
> amqp_channel_open(conn, 2);<br>
> send_batch(conn, "test queue", rate_limit, message_count);<br>
> die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),<br>
> "Closing channel");<br>
> die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),<br>
> "Closing connection");<br>
> amqp_destroy_connection(conn);<br>
> die_on_error(close(sockfd), "Closing socket");<br>
> return 0;<br>
> }<br>
><br>
> Please provide me the reasons for blocking.<br>
><br>
> Thanks,<br>
> Ragavendra.<br>
<br>
</div></div>> _______________________________________________<br>
> rabbitmq-discuss mailing list<br>
> <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
> <a href="http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
<br>
<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</blockquote></div><br>