[rabbitmq-discuss] Messages disappearing

Nick Pateman nick.pateman at certivox.com
Fri Nov 18 13:35:46 GMT 2011


Hey Simon,

I do have message receipt code but I'm not actually running it yet, it's in a completely separate application I'm developing and It's definitely not loaded yet.  The code for my message sending application, which is the only one that I am currently running at the moment is....


#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <syslog.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <stdint.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <assert.h>
#include <sys/time.h>
#include <unistd.h>

#define SUMMARY_EVERY_US 1000000

const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
const amqp_table_t amqp_empty_table = { 0, NULL };

uint64_t now_microseconds(void)
{
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
}

void microsleep(int usec)
{
  usleep(usec);
}

void die_on_error(int x, char const *context) {
  if (x < 0) {
    char *errstr = amqp_error_string(-x);
    fprintf(stderr, "%s: %s\n", context, errstr);
    free(errstr);
    exit(1);
  }
}

void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
  switch (x.reply_type) {
    case AMQP_RESPONSE_NORMAL:
      return;

    case AMQP_RESPONSE_NONE:
      fprintf(stderr, "%s: missing RPC reply type!\n", context);
      break;

    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
      fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
      break;

    case AMQP_RESPONSE_SERVER_EXCEPTION:
      switch (x.reply.id) {
	case AMQP_CONNECTION_CLOSE_METHOD: {
	  amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
	  fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
		  context,
		  m->reply_code,
		  (int) m->reply_text.len, (char *) m->reply_text.bytes);
	  break;  FILE* pFilFile = fopen("cekencapsulatereq.js", "r");
	}
	case AMQP_CHANNEL_CLOSE_METHOD: {
	  amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
	  fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
		  context,
		  m->reply_code,
		  (int) m->reply_text.len, (char *) m->reply_text.bytes);
	  break;
	}
	default:
	  fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
	  break;
      }
      break;
  }

  exit(1);
}

static void dump_row(long count, int numinrow, int *chs) {
  int i;

  printf("%08lX:", count - numinrow);

  if (numinrow > 0) {
    for (i = 0; i < numinrow; i++) {
      if (i == 8)
	printf(" :");
      printf(" %02X", chs[i]);
    }
    for (i = numinrow; i < 16; i++) {
      if (i == 8)
	printf(" :");
      printf("   ");
    }
    printf("  ");
    for (i = 0; i < numinrow; i++) {
      if (isprint(chs[i]))
	printf("%c", chs[i]);
      else
	printf(".");
    }
  }
  printf("\n");
}

static int rows_eq(int *a, int *b) {
  int i;

  for (i=0; i<16; i++)
    if (a[i] != b[i])
      return 0;

  return 1;
}

void amqp_dump(void const *buffer, size_t len) {
  unsigned char *buf = (unsigned char *) buffer;
  long count = 0;
  int numinrow = 0;
  int chs[16];
  int oldchs[16];
  int showed_dots = 0;
  int i;

  for (i = 0; i < len; i++) {
    int ch = buf[i];

    if (numinrow == 16) {
      int i;

      if (rows_eq(oldchs, chs)) {
	if (!showed_dots) {
	  showed_dots = 1;
	  printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
	}
      } else {
	showed_dots = 0;
	dump_row(count, numinrow, chs);
      }

      for (i=0; i<16; i++)
	oldchs[i] = chs[i];

      numinrow = 0;
    }

    count++;
    chs[numinrow++] = ch;
  }

  dump_row(count, numinrow, chs);

  if (numinrow != 0)
    printf("%08lX:\n", count);
}

/**************************************************************************
    Function: main

    Description:
        The c standard 'main' entry point function.

    Returns:
        returns integer which is passed back to the parent process
**************************************************************************/
int main(int argc, char *argv[]) {

    char const *hostname;
    int port;
    char const *exchange;
    char const *bindingkey;

    int sockfd;
    amqp_connection_state_t conn;

    amqp_bytes_t queuename = amqp_cstring_bytes("testing");

    hostname = "localhost";
    port = 5672;

    conn = amqp_new_connection();

    die_on_error(sockfd = amqp_open_socket(hostname, port),
    		"Opening socket");

    amqp_set_sockfd(conn, sockfd);
    die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
  		    "Logging in");

    amqp_channel_open(conn, 1);
    die_on_amqp_error(amqp_get_rpc_reply(conn),
    		"Opening channel");

    amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
    die_on_amqp_error(amqp_get_rpc_reply(conn),
    		"Consuming");

    //****************************************************
    // TODO: Insert core of your message queueing code here
    //****************************************************
    send_batch(conn, (char*)queuename.bytes, 10, 1000);

    //****************************************************
    // TODO: Free any allocated resources before exiting
    //****************************************************
    die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
    		"Closing channel");

    die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
    		"Closing connection");

    die_on_error(amqp_destroy_connection(conn),
    		"Ending connection");

    exit(0);
}

void send_batch(amqp_connection_state_t conn,
		       char const *queue_name,
		       int rate_limit,
		       int message_count)
{
  uint64_t start_time = now_microseconds();
  int i;
  int sent = 0;
  int previous_sent = 0;
  uint64_t previous_report_time = start_time;
  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

  for (i = 0; i < message_count; i++) {
    uint64_t now = now_microseconds();

    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 2; /* persistent delivery mode */
    die_on_error(amqp_basic_publish(conn,
    		1,
    		amqp_cstring_bytes(""),
    		amqp_cstring_bytes(queue_name),
    		0,
    		0,
    		&props,
    		amqp_cstring_bytes("Hello world!")),
    		"Publishing");

    sent++;
    if (now > next_summary_time) {
      int countOverInterval = sent - previous_sent;
      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
      printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
	     (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);

      previous_sent = sent;
      previous_report_time = now;
      next_summary_time += SUMMARY_EVERY_US;
    }

    while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
      microsleep(20  FILE* pFilFile = fopen("cekencapsulatereq.js", "r");00);
      now = now_microseconds();
    }
  }

  {
    uint64_t stop_time = now_microseconds();
    int total_delta = stop_time - start_time;

    printf("PRODUCER - Message count: %d\n", message_count);
    printf("Total time, milliseconds: %d\n", total_delta / 1000);
    printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
  }
}



-----Original Message-----
From: rabbitmq-discuss-bounces at lists.rabbitmq.com [mailto:rabbitmq-discuss-bounces at lists.rabbitmq.com] On Behalf Of Simon MacMullen
Sent: 17 November 2011 15:44
To: rabbitmq-discuss at lists.rabbitmq.com
Subject: Re: [rabbitmq-discuss] Messages disappearing

On 17/11/11 15:38, Nick Pateman wrote:
> I've set no_ack to 0 in my amq_basic_consume call and the messages are 
> now appearing in the queue but as unacknowledged.

So all this sounds like the messages are being sent to your application:

no_ack = 1 -> messages are sent to your app, RabbitMQ forgets about them immediately.

no_ack = 0 -> messages are sent to your app, RabbitMQ treats them as unacknowledged until you ack.

The source code you posted doesn't show any code that actually handles received message frames - you do have some right?

Can you post your whole source somewhere? It's quite difficult to tell what is going on looking at fragments.

Cheers, Simon

--
Simon MacMullen
RabbitMQ, VMware
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss at lists.rabbitmq.com
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss




More information about the rabbitmq-discuss mailing list