Hi,<br><br>I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the broker.<br>We are playing with the examples(with few modifications) given by RabbitMQ. <br>We created one producer and one consumer with the same connection but difference channels.<br>
In the following code amqp_channel_open(conn, 2) statement is causing the my main thread to block <br>but if i move this statement before creating of consumer thread then every thing seems to be working.<br><br>My code is as follows.<br>
static void send_batch(amqp_connection_<div id=":aj" class="ii gt">state_t conn,<br> char const *queue_name,<br> int rate_limit,<br> int message_count)<br>
{<br> char message[256] = "test";<br>
die_on_error(amqp_basic_publish(conn,<br> 2,<br> amqp_cstring_bytes("amq.direct"),<br> amqp_cstring_bytes(queue_name),<br>
0,<br> 0,<br> NULL,<br> (amqp_bytes_t) {.len = sizeof(message), .bytes = message}),<br>
"Publishing");<br> printf("Message sent by producer\n");<br>}<br><br>static void run(amqp_connection_state_t conn)<br>{<br> amqp_frame_t frame;<br> int result;<br> size_t body_received;<br>
size_t body_target;<br> int received = 0; <br> while (1) {<br> amqp_maybe_release_buffers(conn);<br> result = amqp_simple_wait_frame(conn, &frame);<br> if (result <= 0) return;<br> if (frame.frame_type != AMQP_FRAME_METHOD)<br>
continue;<br> if (<a href="http://frame.payload.method.id/" target="_blank">frame.payload.method.id</a> != AMQP_BASIC_DELIVER_METHOD)<br> continue;<br> result = amqp_simple_wait_frame(conn, &frame);<br>
if (result <= 0) return;<br>
if (frame.frame_type != AMQP_FRAME_HEADER) {<br> abort();<br> }<br> body_target = frame.payload.properties.body_size;<br> body_received = 0;<br> while (body_received < body_target) {<br> result = amqp_simple_wait_frame(conn, &frame);<br>
if (result <= 0) return;<br> if (frame.frame_type != AMQP_FRAME_BODY) {<br> abort();<br> }<br> body_received += frame.payload.body_fragment.len;<br> assert(body_received <= body_target);<br>
}<br> received++;<br> printf("received=%d\n",received);<br> }<br>}<br>void* consume(void* conn){<br> run(*((amqp_connection_state_t*)conn));<br>}<br>int main(int argc, char const * const *argv) {<br>
char const *hostname="127.0.0.1";<br> int port="5672";<br> char const *exchange;<br> char const *bindingkey;<br> int sockfd;<br> amqp_connection_state_t conn;<br> amqp_bytes_t queuename;<br> exchange = "amq.direct"; //argv[3];<br>
bindingkey = "test queue"; //argv[4];<br> int rate_limit = 100;<br> int message_count = 10000;<br> conn = amqp_new_connection();<br> die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");<br>
amqp_set_sockfd(conn, sockfd);<br> die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),<br> "Logging in");<br> amqp_channel_open(conn, 1);<br>
die_on_amqp_error(amqp_rpc_reply, "Opening channel");<br> {<br> amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,<br> AMQP_EMPTY_TABLE);<br>
die_on_amqp_error(amqp_rpc_reply, "Declaring queue");<br>
queuename = amqp_bytes_malloc_dup(r->queue);<br> if (queuename.bytes == NULL) {<br> die_on_error(-ENOMEM, "Copying queue name");<br> }<br> }<br> amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),<br>
AMQP_EMPTY_TABLE);<br> die_on_amqp_error(amqp_rpc_reply, "Binding queue");<br><br> amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);<br> die_on_amqp_error(amqp_rpc_reply, "Consuming");<br>
pthread_t threadId;<br> pthread_create(&threadId,NULL,consume,(void*)&conn);<br> sleep(1);<br> amqp_channel_open(conn, 2);<br> send_batch(conn, "test queue", rate_limit, message_count);<br> die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");<br>
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");<br> amqp_destroy_connection(conn);<br> die_on_error(close(sockfd), "Closing socket");<br> return 0;<br>}<br>
<br>Please provide me the reasons for blocking.<br><br>Thanks,<br>Ragavendra.<br>
</div>