[rabbitmq-discuss] amqp_channel_open issue

raghu a a.raghu001 at gmail.com
Wed Feb 10 17:41:49 GMT 2010


Hi,

I am very new to RabbitMQ. we are using Rabbit MQ C API to talk with the
broker.
We are playing with the examples(with few modifications) given by RabbitMQ.
We created one producer and one consumer with the same connection but
difference channels.
In the following code amqp_channel_open(conn, 2) statement is causing the my
main thread to block
but if i move this statement before creating of consumer thread then every
thing seems to be working.

My code is as follows.
static void send_batch(amqp_connection_
state_t conn,
                       char const *queue_name,
                       int rate_limit,
                       int message_count)
{
  char message[256] = "test";
  die_on_error(amqp_basic_publish(conn,
                                    2,
                                    amqp_cstring_bytes("amq.direct"),
                                    amqp_cstring_bytes(queue_name),
                                    0,
                                    0,
                                    NULL,
                                    (amqp_bytes_t) {.len = sizeof(message),
.bytes = message}),
                 "Publishing");
    printf("Message sent by producer\n");
}

static void run(amqp_connection_state_t conn)
{
  amqp_frame_t frame;
  int result;
  size_t body_received;
  size_t body_target;
  int  received = 0;
  while (1) {
    amqp_maybe_release_buffers(conn);
    result = amqp_simple_wait_frame(conn, &frame);
    if (result <= 0) return;
    if (frame.frame_type != AMQP_FRAME_METHOD)
      continue;
    if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
      continue;
    result = amqp_simple_wait_frame(conn, &frame);
    if (result <= 0) return;
    if (frame.frame_type != AMQP_FRAME_HEADER) {
      abort();
    }
    body_target = frame.payload.properties.body_size;
    body_received = 0;
    while (body_received < body_target) {
      result = amqp_simple_wait_frame(conn, &frame);
      if (result <= 0) return;
      if (frame.frame_type != AMQP_FRAME_BODY) {
    abort();
      }
      body_received += frame.payload.body_fragment.len;
      assert(body_received <= body_target);
    }
    received++;
    printf("received=%d\n",received);
  }
}
void*  consume(void* conn){
  run(*((amqp_connection_state_t*)conn));
}
int main(int argc, char const * const *argv) {
  char const *hostname="127.0.0.1";
  int port="5672";
  char const *exchange;
  char const *bindingkey;
  int sockfd;
  amqp_connection_state_t conn;
  amqp_bytes_t queuename;
  exchange = "amq.direct"; //argv[3];
  bindingkey = "test queue"; //argv[4];
  int rate_limit = 100;
  int message_count = 10000;
  conn = amqp_new_connection();
  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
  amqp_set_sockfd(conn, sockfd);
  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
            "Logging in");
  amqp_channel_open(conn, 1);
  die_on_amqp_error(amqp_rpc_reply, "Opening channel");
  {
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,
AMQP_EMPTY_BYTES, 0, 0, 0, 1,
                            AMQP_EMPTY_TABLE);
    die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
    queuename = amqp_bytes_malloc_dup(r->queue);
    if (queuename.bytes == NULL) {
      die_on_error(-ENOMEM, "Copying queue name");
    }
  }
  amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey),
          AMQP_EMPTY_TABLE);
  die_on_amqp_error(amqp_rpc_reply, "Binding queue");

  amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);
  die_on_amqp_error(amqp_rpc_reply, "Consuming");
  pthread_t threadId;
  pthread_create(&threadId,NULL,consume,(void*)&conn);
  sleep(1);
  amqp_channel_open(conn, 2);
  send_batch(conn, "test queue", rate_limit, message_count);
  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
  amqp_destroy_connection(conn);
  die_on_error(close(sockfd), "Closing socket");
  return 0;
}

Please provide me the reasons for blocking.

Thanks,
Ragavendra.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100210/f6ed3703/attachment.htm 


More information about the rabbitmq-discuss mailing list