Hi,<br><br>I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the broker.<br>We are playing with the examples(with few modifications) given by RabbitMQ. <br>We created one producer and one consumer with the same connection but difference channels.<br>
In the following code amqp_channel_open(conn, 2) statement is causing the my main thread to block <br>but if i move this statement before creating of consumer thread then every thing seems to be working.<br><br>My code is as follows.<br>
static void send_batch(amqp_connection_<div id=":aj" class="ii gt">state_t conn,<br>���������������������� char const *queue_name,<br>���������������������� int rate_limit,<br>���������������������� int message_count)<br>
{<br>� char message[256] = "test";<br>
� die_on_error(amqp_basic_publish(conn,<br>����������������������������������� 2,<br>����������������������������������� amqp_cstring_bytes("amq.direct"),<br>����������������������������������� amqp_cstring_bytes(queue_name),<br>
����������������������������������� 0,<br>����������������������������������� 0,<br>����������������������������������� NULL,<br>����������������������������������� (amqp_bytes_t) {.len = sizeof(message), .bytes = message}),<br>
���������������� "Publishing");<br>��� printf("Message sent by producer\n");<br>}<br><br>static void run(amqp_connection_state_t conn)<br>{<br>� amqp_frame_t frame;<br>� int result;<br>� size_t body_received;<br>
� size_t body_target;<br>� int� received = 0;��� <br>� while (1) {<br>��� amqp_maybe_release_buffers(conn);<br>��� result = amqp_simple_wait_frame(conn, &frame);<br>��� if (result <= 0) return;<br>��� if (frame.frame_type != AMQP_FRAME_METHOD)<br>
����� continue;<br>��� if (<a href="http://frame.payload.method.id/" target="_blank">frame.payload.method.id</a> != AMQP_BASIC_DELIVER_METHOD)<br>����� continue;<br>��� result = amqp_simple_wait_frame(conn, &frame);<br>
��� if (result <= 0) return;<br>
��� if (frame.frame_type != AMQP_FRAME_HEADER) {<br>����� abort();<br>��� }<br>��� body_target = frame.payload.properties.body_size;<br>��� body_received = 0;<br>��� while (body_received < body_target) {<br>����� result = amqp_simple_wait_frame(conn, &frame);<br>
����� if (result <= 0) return;<br>����� if (frame.frame_type != AMQP_FRAME_BODY) {<br>��� abort();<br>����� }<br>����� body_received += frame.payload.body_fragment.len;<br>����� assert(body_received <= body_target);<br>
��� }<br>��� received++;<br>��� printf("received=%d\n",received);<br>� }<br>}<br>void*� consume(void* conn){<br>� run(*((amqp_connection_state_t*)conn));<br>}<br>int main(int argc, char const * const *argv) {<br>
� char const *hostname="127.0.0.1";<br>� int port="5672";<br>� char const *exchange;<br>� char const *bindingkey;<br>� int sockfd;<br>� amqp_connection_state_t conn;<br>� amqp_bytes_t queuename;<br>� exchange = "amq.direct"; //argv[3];<br>
� bindingkey = "test queue"; //argv[4];<br>� int rate_limit = 100;<br>� int message_count = 10000;<br>� conn = amqp_new_connection();<br>� die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");<br>
� amqp_set_sockfd(conn, sockfd);<br>� die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),<br>��� ��� ��� "Logging in");<br>� amqp_channel_open(conn, 1);<br>
� die_on_amqp_error(amqp_rpc_reply, "Opening channel");<br>� {<br>��� amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,<br>��� ��� ��� ��� ��� ��� ��� AMQP_EMPTY_TABLE);<br>
��� die_on_amqp_error(amqp_rpc_reply, "Declaring queue");<br>
��� queuename = amqp_bytes_malloc_dup(r->queue);<br>��� if (queuename.bytes == NULL) {<br>����� die_on_error(-ENOMEM, "Copying queue name");<br>��� }<br>� }<br>� amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),<br>
��� ��� � AMQP_EMPTY_TABLE);<br>� die_on_amqp_error(amqp_rpc_reply, "Binding queue");<br><br>� amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);<br>� die_on_amqp_error(amqp_rpc_reply, "Consuming");<br>
� pthread_t threadId;<br>� pthread_create(&threadId,NULL,consume,(void*)&conn);<br>� sleep(1);<br>� amqp_channel_open(conn, 2);<br>� send_batch(conn, "test queue", rate_limit, message_count);<br>� die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");<br>
� die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");<br>� amqp_destroy_connection(conn);<br>� die_on_error(close(sockfd), "Closing socket");<br>� return 0;<br>}<br>
<br>Please provide me the reasons for blocking.<br><br>Thanks,<br>Ragavendra.<br>
</div>