[rabbitmq-discuss] amqp_channel_open issue
Matthew Sackman
matthew at lshift.net
Thu Feb 11 16:07:33 GMT 2010
Hi,
I'm afraid the RabbitMQ-C client is highly experimental and not really
receiving much development attention at the moment. We would not
recommend it for production use - it has not been through our QA
processes. I'm fairly sure that you need to make sure that you don't use
a channel from more than one thread, but I have a feeling there are even
more constraints than that. I'm afraid, if in doubt, don't use the C
client, at the moment.
Matthew
On Wed, Feb 10, 2010 at 05:41:49PM +0000, raghu a wrote:
> Hi,
>
> I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the
> broker.
> We are playing with the examples(with few modifications) given by RabbitMQ.
> We created one producer and one consumer with the same connection but
> difference channels.
> In the following code amqp_channel_open(conn, 2) statement is causing the my
> main thread to block
> but if i move this statement before creating of consumer thread then every
> thing seems to be working.
>
> My code is as follows.
> static void send_batch(amqp_connection_
> state_t conn,
> char const *queue_name,
> int rate_limit,
> int message_count)
> {
> char message[256] = "test";
> die_on_error(amqp_basic_publish(conn,
> 2,
> amqp_cstring_bytes("amq.direct"),
> amqp_cstring_bytes(queue_name),
> 0,
> 0,
> NULL,
> (amqp_bytes_t) {.len = sizeof(message),
> .bytes = message}),
> "Publishing");
> printf("Message sent by producer\n");
> }
>
> static void run(amqp_connection_state_t conn)
> {
> amqp_frame_t frame;
> int result;
> size_t body_received;
> size_t body_target;
> int received = 0;
> while (1) {
> amqp_maybe_release_buffers(conn);
> result = amqp_simple_wait_frame(conn, &frame);
> if (result <= 0) return;
> if (frame.frame_type != AMQP_FRAME_METHOD)
> continue;
> if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
> continue;
> result = amqp_simple_wait_frame(conn, &frame);
> if (result <= 0) return;
> if (frame.frame_type != AMQP_FRAME_HEADER) {
> abort();
> }
> body_target = frame.payload.properties.body_size;
> body_received = 0;
> while (body_received < body_target) {
> result = amqp_simple_wait_frame(conn, &frame);
> if (result <= 0) return;
> if (frame.frame_type != AMQP_FRAME_BODY) {
> abort();
> }
> body_received += frame.payload.body_fragment.len;
> assert(body_received <= body_target);
> }
> received++;
> printf("received=%d\n",received);
> }
> }
> void* consume(void* conn){
> run(*((amqp_connection_state_t*)conn));
> }
> int main(int argc, char const * const *argv) {
> char const *hostname="127.0.0.1";
> int port="5672";
> char const *exchange;
> char const *bindingkey;
> int sockfd;
> amqp_connection_state_t conn;
> amqp_bytes_t queuename;
> exchange = "amq.direct"; //argv[3];
> bindingkey = "test queue"; //argv[4];
> int rate_limit = 100;
> int message_count = 10000;
> conn = amqp_new_connection();
> die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
> amqp_set_sockfd(conn, sockfd);
> die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0,
> AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
> "Logging in");
> amqp_channel_open(conn, 1);
> die_on_amqp_error(amqp_rpc_reply, "Opening channel");
> {
> amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,
> AMQP_EMPTY_BYTES, 0, 0, 0, 1,
> AMQP_EMPTY_TABLE);
> die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
> queuename = amqp_bytes_malloc_dup(r->queue);
> if (queuename.bytes == NULL) {
> die_on_error(-ENOMEM, "Copying queue name");
> }
> }
> amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
> amqp_cstring_bytes(bindingkey),
> AMQP_EMPTY_TABLE);
> die_on_amqp_error(amqp_rpc_reply, "Binding queue");
>
> amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);
> die_on_amqp_error(amqp_rpc_reply, "Consuming");
> pthread_t threadId;
> pthread_create(&threadId,NULL,consume,(void*)&conn);
> sleep(1);
> amqp_channel_open(conn, 2);
> send_batch(conn, "test queue", rate_limit, message_count);
> die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
> "Closing channel");
> die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
> "Closing connection");
> amqp_destroy_connection(conn);
> die_on_error(close(sockfd), "Closing socket");
> return 0;
> }
>
> Please provide me the reasons for blocking.
>
> Thanks,
> Ragavendra.
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
More information about the rabbitmq-discuss
mailing list