[rabbitmq-discuss] Messages disappearing

Valentin BERNARD vbernard42 at gmail.com
Fri Nov 18 14:40:55 GMT 2011


Hi,

You probably shouldn't call basic_consume on your message sending
application. If I understand correctly, you're consuming on the queue
you're sending messages to — so they just come back at you, and since
you have no code to handle them they get lost.

Try removing the following lines:
   amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0,
amqp_empty_table);
   die_on_amqp_error(amqp_get_rpc_reply(conn),
                 "Consuming");

Cheers,

Valentin.


On 18 nov, 14:35, Nick Pateman <nick.pate... at certivox.com> wrote:
> Hey Simon,
>
> I do have message receipt code but I'm not actually running it yet, it's in a completely separate application I'm developing and It's definitely not loaded yet.  The code for my message sending application, which is the only one that I am currently running at the moment is....
>
> #include <sys/types.h>
> #include <sys/stat.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <fcntl.h>
> #include <errno.h>
> #include <unistd.h>
> #include <syslog.h>
> #include <string.h>
> #include <assert.h>
> #include <signal.h>
> #include <stdint.h>
> #include <amqp.h>
> #include <amqp_framing.h>
> #include <assert.h>
> #include <sys/time.h>
> #include <unistd.h>
>
> #define SUMMARY_EVERY_US 1000000
>
> const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
> const amqp_table_t amqp_empty_table = { 0, NULL };
>
> uint64_t now_microseconds(void)
> {
>   struct timeval tv;
>   gettimeofday(&tv, NULL);
>   return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
>
> }
>
> void microsleep(int usec)
> {
>   usleep(usec);
>
> }
>
> void die_on_error(int x, char const *context) {
>   if (x < 0) {
>     char *errstr = amqp_error_string(-x);
>     fprintf(stderr, "%s: %s\n", context, errstr);
>     free(errstr);
>     exit(1);
>   }
>
> }
>
> void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
>   switch (x.reply_type) {
>     case AMQP_RESPONSE_NORMAL:
>       return;
>
>     case AMQP_RESPONSE_NONE:
>       fprintf(stderr, "%s: missing RPC reply type!\n", context);
>       break;
>
>     case AMQP_RESPONSE_LIBRARY_EXCEPTION:
>       fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
>       break;
>
>     case AMQP_RESPONSE_SERVER_EXCEPTION:
>       switch (x.reply.id) {
>         case AMQP_CONNECTION_CLOSE_METHOD: {
>           amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
>           fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
>                   context,
>                   m->reply_code,
>                   (int) m->reply_text.len, (char *) m->reply_text.bytes);
>           break;  FILE* pFilFile = fopen("cekencapsulatereq.js", "r");
>         }
>         case AMQP_CHANNEL_CLOSE_METHOD: {
>           amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
>           fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
>                   context,
>                   m->reply_code,
>                   (int) m->reply_text.len, (char *) m->reply_text.bytes);
>           break;
>         }
>         default:
>           fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
>           break;
>       }
>       break;
>   }
>
>   exit(1);
>
> }
>
> static void dump_row(long count, int numinrow, int *chs) {
>   int i;
>
>   printf("%08lX:", count - numinrow);
>
>   if (numinrow > 0) {
>     for (i = 0; i < numinrow; i++) {
>       if (i == 8)
>         printf(" :");
>       printf(" %02X", chs[i]);
>     }
>     for (i = numinrow; i < 16; i++) {
>       if (i == 8)
>         printf(" :");
>       printf("   ");
>     }
>     printf("  ");
>     for (i = 0; i < numinrow; i++) {
>       if (isprint(chs[i]))
>         printf("%c", chs[i]);
>       else
>         printf(".");
>     }
>   }
>   printf("\n");
>
> }
>
> static int rows_eq(int *a, int *b) {
>   int i;
>
>   for (i=0; i<16; i++)
>     if (a[i] != b[i])
>       return 0;
>
>   return 1;
>
> }
>
> void amqp_dump(void const *buffer, size_t len) {
>   unsigned char *buf = (unsigned char *) buffer;
>   long count = 0;
>   int numinrow = 0;
>   int chs[16];
>   int oldchs[16];
>   int showed_dots = 0;
>   int i;
>
>   for (i = 0; i < len; i++) {
>     int ch = buf[i];
>
>     if (numinrow == 16) {
>       int i;
>
>       if (rows_eq(oldchs, chs)) {
>         if (!showed_dots) {
>           showed_dots = 1;
>           printf("          .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
>         }
>       } else {
>         showed_dots = 0;
>         dump_row(count, numinrow, chs);
>       }
>
>       for (i=0; i<16; i++)
>         oldchs[i] = chs[i];
>
>       numinrow = 0;
>     }
>
>     count++;
>     chs[numinrow++] = ch;
>   }
>
>   dump_row(count, numinrow, chs);
>
>   if (numinrow != 0)
>     printf("%08lX:\n", count);
>
> }
>
> /**************************************************************************
>     Function: main
>
>     Description:
>         The c standard 'main' entry point function.
>
>     Returns:
>         returns integer which is passed back to the parent process
> **************************************************************************/
> int main(int argc, char *argv[]) {
>
>     char const *hostname;
>     int port;
>     char const *exchange;
>     char const *bindingkey;
>
>     int sockfd;
>     amqp_connection_state_t conn;
>
>     amqp_bytes_t queuename = amqp_cstring_bytes("testing");
>
>     hostname = "localhost";
>     port = 5672;
>
>     conn = amqp_new_connection();
>
>     die_on_error(sockfd = amqp_open_socket(hostname, port),
>                 "Opening socket");
>
>     amqp_set_sockfd(conn, sockfd);
>     die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
>                     "Logging in");
>
>     amqp_channel_open(conn, 1);
>     die_on_amqp_error(amqp_get_rpc_reply(conn),
>                 "Opening channel");
>
>     amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
>     die_on_amqp_error(amqp_get_rpc_reply(conn),
>                 "Consuming");
>
>     //****************************************************
>     // TODO: Insert core of your message queueing code here
>     //****************************************************
>     send_batch(conn, (char*)queuename.bytes, 10, 1000);
>
>     //****************************************************
>     // TODO: Free any allocated resources before exiting
>     //****************************************************
>     die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
>                 "Closing channel");
>
>     die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
>                 "Closing connection");
>
>     die_on_error(amqp_destroy_connection(conn),
>                 "Ending connection");
>
>     exit(0);
>
> }
>
> void send_batch(amqp_connection_state_t conn,
>                        char const *queue_name,
>                        int rate_limit,
>                        int message_count)
> {
>   uint64_t start_time = now_microseconds();
>   int i;
>   int sent = 0;
>   int previous_sent = 0;
>   uint64_t previous_report_time = start_time;
>   uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
>
>   for (i = 0; i < message_count; i++) {
>     uint64_t now = now_microseconds();
>
>     amqp_basic_properties_t props;
>     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
>     props.content_type = amqp_cstring_bytes("text/plain");
>     props.delivery_mode = 2; /* persistent delivery mode */
>     die_on_error(amqp_basic_publish(conn,
>                 1,
>                 amqp_cstring_bytes(""),
>                 amqp_cstring_bytes(queue_name),
>                 0,
>                 0,
>                 &props,
>                 amqp_cstring_bytes("Hello world!")),
>                 "Publishing");
>
>     sent++;
>     if (now > next_summary_time) {
>       int countOverInterval = sent - previous_sent;
>       double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
>       printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
>              (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
>
>       previous_sent = sent;
>       previous_report_time = now;
>       next_summary_time += SUMMARY_EVERY_US;
>     }
>
>     while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
>       microsleep(20  FILE* pFilFile = fopen("cekencapsulatereq.js", "r");00);
>       now = now_microseconds();
>     }
>   }
>
>   {
>     uint64_t stop_time = now_microseconds();
>     int total_delta = stop_time - start_time;
>
>     printf("PRODUCER - Message count: %d\n", message_count);
>     printf("Total time, milliseconds: %d\n", total_delta / 1000);
>     printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
>   }
>
>
>
> }
> -----Original Message-----
> From: rabbitmq-discuss-boun... at lists.rabbitmq.com [mailto:rabbitmq-discuss-boun... at lists.rabbitmq.com] On Behalf Of Simon MacMullen
> Sent: 17 November 2011 15:44
> To: rabbitmq-disc... at lists.rabbitmq.com
> Subject: Re: [rabbitmq-discuss] Messages disappearing
>
> On 17/11/11 15:38, Nick Pateman wrote:
> > I've set no_ack to 0 in my amq_basic_consume call and the messages are
> > now appearing in the queue but as unacknowledged.
>
> So all this sounds like the messages are being sent to your application:
>
> no_ack = 1 -> messages are sent to your app, RabbitMQ forgets about them immediately.
>
> no_ack = 0 -> messages are sent to your app, RabbitMQ treats them as unacknowledged until you ack.
>
> The source code you posted doesn't show any code that actually handles received message frames - you do have some right?
>
> Can you post your whole source somewhere? It's quite difficult to tell what is going on looking at fragments.
>
> Cheers, Simon
>
> --
> Simon MacMullen
> RabbitMQ, VMware
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


More information about the rabbitmq-discuss mailing list