[rabbitmq-discuss] Messages disappearing
Nick Pateman
nick.pateman at certivox.com
Fri Nov 18 15:13:55 GMT 2011
Awesome! You're a star, it's working great now thanks a million for your time.
-----Original Message-----
From: rabbitmq-discuss-bounces at lists.rabbitmq.com [mailto:rabbitmq-discuss-bounces at lists.rabbitmq.com] On Behalf Of Valentin BERNARD
Sent: 18 November 2011 14:41
To: rabbitmq-discuss at lists.rabbitmq.com
Subject: Re: [rabbitmq-discuss] Messages disappearing
Hi,
You probably shouldn't call basic_consume on your message sending application. If I understand correctly, you're consuming on the queue you're sending messages to - so they just come back at you, and since you have no code to handle them they get lost.
Try removing the following lines:
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn),
"Consuming");
Cheers,
Valentin.
On 18 nov, 14:35, Nick Pateman <nick.pate... at certivox.com> wrote:
> Hey Simon,
>
> I do have message receipt code but I'm not actually running it yet, it's in a completely separate application I'm developing and It's definitely not loaded yet. The code for my message sending application, which is the only one that I am currently running at the moment is....
>
> #include <sys/types.h>
> #include <sys/stat.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <fcntl.h>
> #include <errno.h>
> #include <unistd.h>
> #include <syslog.h>
> #include <string.h>
> #include <assert.h>
> #include <signal.h>
> #include <stdint.h>
> #include <amqp.h>
> #include <amqp_framing.h>
> #include <assert.h>
> #include <sys/time.h>
> #include <unistd.h>
>
> #define SUMMARY_EVERY_US 1000000
>
> const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; const amqp_table_t
> amqp_empty_table = { 0, NULL };
>
> uint64_t now_microseconds(void)
> {
> struct timeval tv;
> gettimeofday(&tv, NULL);
> return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
>
> }
>
> void microsleep(int usec)
> {
> usleep(usec);
>
> }
>
> void die_on_error(int x, char const *context) {
> if (x < 0) {
> char *errstr = amqp_error_string(-x);
> fprintf(stderr, "%s: %s\n", context, errstr);
> free(errstr);
> exit(1);
> }
>
> }
>
> void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
> switch (x.reply_type) {
> case AMQP_RESPONSE_NORMAL:
> return;
>
> case AMQP_RESPONSE_NONE:
> fprintf(stderr, "%s: missing RPC reply type!\n", context);
> break;
>
> case AMQP_RESPONSE_LIBRARY_EXCEPTION:
> fprintf(stderr, "%s: %s\n", context,
> amqp_error_string(x.library_error));
> break;
>
> case AMQP_RESPONSE_SERVER_EXCEPTION:
> switch (x.reply.id) {
> case AMQP_CONNECTION_CLOSE_METHOD: {
> amqp_connection_close_t *m = (amqp_connection_close_t *)
> x.reply.decoded;
> fprintf(stderr, "%s: server connection error %d, message:
> %.*s\n",
> context,
> m->reply_code,
> (int) m->reply_text.len, (char *)
> m->reply_text.bytes);
> break; FILE* pFilFile = fopen("cekencapsulatereq.js", "r");
> }
> case AMQP_CHANNEL_CLOSE_METHOD: {
> amqp_channel_close_t *m = (amqp_channel_close_t *)
> x.reply.decoded;
> fprintf(stderr, "%s: server channel error %d, message:
> %.*s\n",
> context,
> m->reply_code,
> (int) m->reply_text.len, (char *)
> m->reply_text.bytes);
> break;
> }
> default:
> fprintf(stderr, "%s: unknown server error, method id
> 0x%08X\n", context, x.reply.id);
> break;
> }
> break;
> }
>
> exit(1);
>
> }
>
> static void dump_row(long count, int numinrow, int *chs) {
> int i;
>
> printf("%08lX:", count - numinrow);
>
> if (numinrow > 0) {
> for (i = 0; i < numinrow; i++) {
> if (i == 8)
> printf(" :");
> printf(" %02X", chs[i]);
> }
> for (i = numinrow; i < 16; i++) {
> if (i == 8)
> printf(" :");
> printf(" ");
> }
> printf(" ");
> for (i = 0; i < numinrow; i++) {
> if (isprint(chs[i]))
> printf("%c", chs[i]);
> else
> printf(".");
> }
> }
> printf("\n");
>
> }
>
> static int rows_eq(int *a, int *b) {
> int i;
>
> for (i=0; i<16; i++)
> if (a[i] != b[i])
> return 0;
>
> return 1;
>
> }
>
> void amqp_dump(void const *buffer, size_t len) {
> unsigned char *buf = (unsigned char *) buffer;
> long count = 0;
> int numinrow = 0;
> int chs[16];
> int oldchs[16];
> int showed_dots = 0;
> int i;
>
> for (i = 0; i < len; i++) {
> int ch = buf[i];
>
> if (numinrow == 16) {
> int i;
>
> if (rows_eq(oldchs, chs)) {
> if (!showed_dots) {
> showed_dots = 1;
> printf(" .. .. .. .. .. .. .. .. : .. .. .. .. ..
> .. .. ..\n");
> }
> } else {
> showed_dots = 0;
> dump_row(count, numinrow, chs);
> }
>
> for (i=0; i<16; i++)
> oldchs[i] = chs[i];
>
> numinrow = 0;
> }
>
> count++;
> chs[numinrow++] = ch;
> }
>
> dump_row(count, numinrow, chs);
>
> if (numinrow != 0)
> printf("%08lX:\n", count);
>
> }
>
> /*********************************************************************
> *****
> Function: main
>
> Description:
> The c standard 'main' entry point function.
>
> Returns:
> returns integer which is passed back to the parent process
> **********************************************************************
> ****/
> int main(int argc, char *argv[]) {
>
> char const *hostname;
> int port;
> char const *exchange;
> char const *bindingkey;
>
> int sockfd;
> amqp_connection_state_t conn;
>
> amqp_bytes_t queuename = amqp_cstring_bytes("testing");
>
> hostname = "localhost";
> port = 5672;
>
> conn = amqp_new_connection();
>
> die_on_error(sockfd = amqp_open_socket(hostname, port),
> "Opening socket");
>
> amqp_set_sockfd(conn, sockfd);
> die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0,
> AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
> "Logging in");
>
> amqp_channel_open(conn, 1);
> die_on_amqp_error(amqp_get_rpc_reply(conn),
> "Opening channel");
>
> amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0,
> amqp_empty_table);
> die_on_amqp_error(amqp_get_rpc_reply(conn),
> "Consuming");
>
> //****************************************************
> // TODO: Insert core of your message queueing code here
> //****************************************************
> send_batch(conn, (char*)queuename.bytes, 10, 1000);
>
> //****************************************************
> // TODO: Free any allocated resources before exiting
> //****************************************************
> die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
> "Closing channel");
>
> die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
> "Closing connection");
>
> die_on_error(amqp_destroy_connection(conn),
> "Ending connection");
>
> exit(0);
>
> }
>
> void send_batch(amqp_connection_state_t conn,
> char const *queue_name,
> int rate_limit,
> int message_count) {
> uint64_t start_time = now_microseconds();
> int i;
> int sent = 0;
> int previous_sent = 0;
> uint64_t previous_report_time = start_time;
> uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
>
> for (i = 0; i < message_count; i++) {
> uint64_t now = now_microseconds();
>
> amqp_basic_properties_t props;
> props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
> AMQP_BASIC_DELIVERY_MODE_FLAG;
> props.content_type = amqp_cstring_bytes("text/plain");
> props.delivery_mode = 2; /* persistent delivery mode */
> die_on_error(amqp_basic_publish(conn,
> 1,
> amqp_cstring_bytes(""),
> amqp_cstring_bytes(queue_name),
> 0,
> 0,
> &props,
> amqp_cstring_bytes("Hello world!")),
> "Publishing");
>
> sent++;
> if (now > next_summary_time) {
> int countOverInterval = sent - previous_sent;
> double intervalRate = countOverInterval / ((now -
> previous_report_time) / 1000000.0);
> printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
> (int)(now - start_time) / 1000, sent, countOverInterval,
> (int) intervalRate);
>
> previous_sent = sent;
> previous_report_time = now;
> next_summary_time += SUMMARY_EVERY_US;
> }
>
> while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
> microsleep(20 FILE* pFilFile = fopen("cekencapsulatereq.js",
> "r");00);
> now = now_microseconds();
> }
> }
>
> {
> uint64_t stop_time = now_microseconds();
> int total_delta = stop_time - start_time;
>
> printf("PRODUCER - Message count: %d\n", message_count);
> printf("Total time, milliseconds: %d\n", total_delta / 1000);
> printf("Overall messages-per-second: %g\n", (message_count /
> (total_delta / 1000000.0)));
> }
>
>
>
> }
> -----Original Message-----
> From: rabbitmq-discuss-boun... at lists.rabbitmq.com
> [mailto:rabbitmq-discuss-boun... at lists.rabbitmq.com] On Behalf Of
> Simon MacMullen
> Sent: 17 November 2011 15:44
> To: rabbitmq-disc... at lists.rabbitmq.com
> Subject: Re: [rabbitmq-discuss] Messages disappearing
>
> On 17/11/11 15:38, Nick Pateman wrote:
> > I've set no_ack to 0 in my amq_basic_consume call and the messages
> > are now appearing in the queue but as unacknowledged.
>
> So all this sounds like the messages are being sent to your application:
>
> no_ack = 1 -> messages are sent to your app, RabbitMQ forgets about them immediately.
>
> no_ack = 0 -> messages are sent to your app, RabbitMQ treats them as unacknowledged until you ack.
>
> The source code you posted doesn't show any code that actually handles received message frames - you do have some right?
>
> Can you post your whole source somewhere? It's quite difficult to tell what is going on looking at fragments.
>
> Cheers, Simon
>
> --
> Simon MacMullen
> RabbitMQ, VMware
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/
> mailman/listinfo/rabbitmq-discuss
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/
> mailman/listinfo/rabbitmq-discuss
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss at lists.rabbitmq.com
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
More information about the rabbitmq-discuss
mailing list