Thanks for your response.<br>As per my code each thread is using different channel.<br>May I get any� multi threaded C client samples?<br>Could you please let me know the constraints also?<br><br>thanks,<br>Ragavendra.<br>
<br><br><div class="gmail_quote">On Thu, Feb 11, 2010 at 9:37 PM, Matthew Sackman <span dir="ltr">&lt;<a href="mailto:matthew@lshift.net">matthew@lshift.net</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
Hi,<br>
<br>
I&#39;m afraid the RabbitMQ-C client is highly experimental and not really<br>
receiving much development attention at the moment. We would not<br>
recommend it for production use - it has not been through our QA<br>
processes. I&#39;m fairly sure that you need to make sure that you don&#39;t use<br>
a channel from more than one thread, but I have a feeling there are even<br>
more constraints than that. I&#39;m afraid, if in doubt, don&#39;t use the C<br>
client, at the moment.<br>
<br>
Matthew<br>
<div><div></div><div class="h5"><br>
On Wed, Feb 10, 2010 at 05:41:49PM +0000, raghu a wrote:<br>
&gt; Hi,<br>
&gt;<br>
&gt; I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the<br>
&gt; broker.<br>
&gt; We are playing with the examples(with few modifications) given by RabbitMQ.<br>
&gt; We created one producer and one consumer with the same connection but<br>
&gt; difference channels.<br>
&gt; In the following code amqp_channel_open(conn, 2) statement is causing the my<br>
&gt; main thread to block<br>
&gt; but if i move this statement before creating of consumer thread then every<br>
&gt; thing seems to be working.<br>
&gt;<br>
&gt; My code is as follows.<br>
&gt; static void send_batch(amqp_connection_<br>
&gt; state_t conn,<br>
&gt; � � � � � � � � � � � �char const *queue_name,<br>
&gt; � � � � � � � � � � � �int rate_limit,<br>
&gt; � � � � � � � � � � � �int message_count)<br>
&gt; {<br>
&gt; � char message[256] = &quot;test&quot;;<br>
&gt; � die_on_error(amqp_basic_publish(conn,<br>
&gt; � � � � � � � � � � � � � � � � � � 2,<br>
&gt; � � � � � � � � � � � � � � � � � � amqp_cstring_bytes(&quot;amq.direct&quot;),<br>
&gt; � � � � � � � � � � � � � � � � � � amqp_cstring_bytes(queue_name),<br>
&gt; � � � � � � � � � � � � � � � � � � 0,<br>
&gt; � � � � � � � � � � � � � � � � � � 0,<br>
&gt; � � � � � � � � � � � � � � � � � � NULL,<br>
&gt; � � � � � � � � � � � � � � � � � � (amqp_bytes_t) {.len = sizeof(message),<br>
&gt; .bytes = message}),<br>
&gt; � � � � � � � � �&quot;Publishing&quot;);<br>
&gt; � � printf(&quot;Message sent by producer\n&quot;);<br>
&gt; }<br>
&gt;<br>
&gt; static void run(amqp_connection_state_t conn)<br>
&gt; {<br>
&gt; � amqp_frame_t frame;<br>
&gt; � int result;<br>
&gt; � size_t body_received;<br>
&gt; � size_t body_target;<br>
&gt; � int �received = 0;<br>
&gt; � while (1) {<br>
&gt; � � amqp_maybe_release_buffers(conn);<br>
&gt; � � result = amqp_simple_wait_frame(conn, &amp;frame);<br>
&gt; � � if (result &lt;= 0) return;<br>
&gt; � � if (frame.frame_type != AMQP_FRAME_METHOD)<br>
&gt; � � � continue;<br>
&gt; � � if (<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a> != AMQP_BASIC_DELIVER_METHOD)<br>
&gt; � � � continue;<br>
&gt; � � result = amqp_simple_wait_frame(conn, &amp;frame);<br>
&gt; � � if (result &lt;= 0) return;<br>
&gt; � � if (frame.frame_type != AMQP_FRAME_HEADER) {<br>
&gt; � � � abort();<br>
&gt; � � }<br>
&gt; � � body_target = frame.payload.properties.body_size;<br>
&gt; � � body_received = 0;<br>
&gt; � � while (body_received &lt; body_target) {<br>
&gt; � � � result = amqp_simple_wait_frame(conn, &amp;frame);<br>
&gt; � � � if (result &lt;= 0) return;<br>
&gt; � � � if (frame.frame_type != AMQP_FRAME_BODY) {<br>
&gt; � � abort();<br>
&gt; � � � }<br>
&gt; � � � body_received += frame.payload.body_fragment.len;<br>
&gt; � � � assert(body_received &lt;= body_target);<br>
&gt; � � }<br>
&gt; � � received++;<br>
&gt; � � printf(&quot;received=%d\n&quot;,received);<br>
&gt; � }<br>
&gt; }<br>
&gt; void* �consume(void* conn){<br>
&gt; � run(*((amqp_connection_state_t*)conn));<br>
&gt; }<br>
&gt; int main(int argc, char const * const *argv) {<br>
&gt; � char const *hostname=&quot;127.0.0.1&quot;;<br>
&gt; � int port=&quot;5672&quot;;<br>
&gt; � char const *exchange;<br>
&gt; � char const *bindingkey;<br>
&gt; � int sockfd;<br>
&gt; � amqp_connection_state_t conn;<br>
&gt; � amqp_bytes_t queuename;<br>
&gt; � exchange = &quot;amq.direct&quot;; //argv[3];<br>
&gt; � bindingkey = &quot;test queue&quot;; //argv[4];<br>
&gt; � int rate_limit = 100;<br>
&gt; � int message_count = 10000;<br>
&gt; � conn = amqp_new_connection();<br>
&gt; � die_on_error(sockfd = amqp_open_socket(hostname, port), &quot;Opening socket&quot;);<br>
&gt; � amqp_set_sockfd(conn, sockfd);<br>
&gt; � die_on_amqp_error(amqp_login(conn, &quot;/&quot;, 0, 131072, 0,<br>
&gt; AMQP_SASL_METHOD_PLAIN, &quot;guest&quot;, &quot;guest&quot;),<br>
&gt; � � � � � � &quot;Logging in&quot;);<br>
&gt; � amqp_channel_open(conn, 1);<br>
&gt; � die_on_amqp_error(amqp_rpc_reply, &quot;Opening channel&quot;);<br>
&gt; � {<br>
&gt; � � amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,<br>
&gt; AMQP_EMPTY_BYTES, 0, 0, 0, 1,<br>
&gt; � � � � � � � � � � � � � � AMQP_EMPTY_TABLE);<br>
&gt; � � die_on_amqp_error(amqp_rpc_reply, &quot;Declaring queue&quot;);<br>
&gt; � � queuename = amqp_bytes_malloc_dup(r-&gt;queue);<br>
&gt; � � if (queuename.bytes == NULL) {<br>
&gt; � � � die_on_error(-ENOMEM, &quot;Copying queue name&quot;);<br>
&gt; � � }<br>
&gt; � }<br>
&gt; � amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),<br>
&gt; amqp_cstring_bytes(bindingkey),<br>
&gt; � � � � � AMQP_EMPTY_TABLE);<br>
&gt; � die_on_amqp_error(amqp_rpc_reply, &quot;Binding queue&quot;);<br>
&gt;<br>
&gt; � amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);<br>
&gt; � die_on_amqp_error(amqp_rpc_reply, &quot;Consuming&quot;);<br>
&gt; � pthread_t threadId;<br>
&gt; � pthread_create(&amp;threadId,NULL,consume,(void*)&amp;conn);<br>
&gt; � sleep(1);<br>
&gt; � amqp_channel_open(conn, 2);<br>
&gt; � send_batch(conn, &quot;test queue&quot;, rate_limit, message_count);<br>
&gt; � die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),<br>
&gt; &quot;Closing channel&quot;);<br>
&gt; � die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),<br>
&gt; &quot;Closing connection&quot;);<br>
&gt; � amqp_destroy_connection(conn);<br>
&gt; � die_on_error(close(sockfd), &quot;Closing socket&quot;);<br>
&gt; � return 0;<br>
&gt; }<br>
&gt;<br>
&gt; Please provide me the reasons for blocking.<br>
&gt;<br>
&gt; Thanks,<br>
&gt; Ragavendra.<br>
<br>
</div></div>&gt; _______________________________________________<br>
&gt; rabbitmq-discuss mailing list<br>
&gt; <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
&gt; <a href="http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
<br>
<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</blockquote></div><br>