[rabbitmq-discuss] Messages disappearing
Nick Pateman
nick.pateman at certivox.com
Fri Nov 18 13:35:46 GMT 2011
Hey Simon,
I do have message receipt code but I'm not actually running it yet, it's in a completely separate application I'm developing and It's definitely not loaded yet. The code for my message sending application, which is the only one that I am currently running at the moment is....
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <syslog.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <stdint.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <assert.h>
#include <sys/time.h>
#include <unistd.h>
#define SUMMARY_EVERY_US 1000000
const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
const amqp_table_t amqp_empty_table = { 0, NULL };
uint64_t now_microseconds(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint64_t) tv.tv_sec * 1000000 + (uint64_t) tv.tv_usec;
}
void microsleep(int usec)
{
usleep(usec);
}
void die_on_error(int x, char const *context) {
if (x < 0) {
char *errstr = amqp_error_string(-x);
fprintf(stderr, "%s: %s\n", context, errstr);
free(errstr);
exit(1);
}
}
void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break; FILE* pFilFile = fopen("cekencapsulatereq.js", "r");
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
}
break;
}
exit(1);
}
static void dump_row(long count, int numinrow, int *chs) {
int i;
printf("%08lX:", count - numinrow);
if (numinrow > 0) {
for (i = 0; i < numinrow; i++) {
if (i == 8)
printf(" :");
printf(" %02X", chs[i]);
}
for (i = numinrow; i < 16; i++) {
if (i == 8)
printf(" :");
printf(" ");
}
printf(" ");
for (i = 0; i < numinrow; i++) {
if (isprint(chs[i]))
printf("%c", chs[i]);
else
printf(".");
}
}
printf("\n");
}
static int rows_eq(int *a, int *b) {
int i;
for (i=0; i<16; i++)
if (a[i] != b[i])
return 0;
return 1;
}
void amqp_dump(void const *buffer, size_t len) {
unsigned char *buf = (unsigned char *) buffer;
long count = 0;
int numinrow = 0;
int chs[16];
int oldchs[16];
int showed_dots = 0;
int i;
for (i = 0; i < len; i++) {
int ch = buf[i];
if (numinrow == 16) {
int i;
if (rows_eq(oldchs, chs)) {
if (!showed_dots) {
showed_dots = 1;
printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
}
} else {
showed_dots = 0;
dump_row(count, numinrow, chs);
}
for (i=0; i<16; i++)
oldchs[i] = chs[i];
numinrow = 0;
}
count++;
chs[numinrow++] = ch;
}
dump_row(count, numinrow, chs);
if (numinrow != 0)
printf("%08lX:\n", count);
}
/**************************************************************************
Function: main
Description:
The c standard 'main' entry point function.
Returns:
returns integer which is passed back to the parent process
**************************************************************************/
int main(int argc, char *argv[]) {
char const *hostname;
int port;
char const *exchange;
char const *bindingkey;
int sockfd;
amqp_connection_state_t conn;
amqp_bytes_t queuename = amqp_cstring_bytes("testing");
hostname = "localhost";
port = 5672;
conn = amqp_new_connection();
die_on_error(sockfd = amqp_open_socket(hostname, port),
"Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn),
"Opening channel");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn),
"Consuming");
//****************************************************
// TODO: Insert core of your message queueing code here
//****************************************************
send_batch(conn, (char*)queuename.bytes, 10, 1000);
//****************************************************
// TODO: Free any allocated resources before exiting
//****************************************************
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn),
"Ending connection");
exit(0);
}
void send_batch(amqp_connection_state_t conn,
char const *queue_name,
int rate_limit,
int message_count)
{
uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
uint64_t previous_report_time = start_time;
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
for (i = 0; i < message_count; i++) {
uint64_t now = now_microseconds();
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes(""),
amqp_cstring_bytes(queue_name),
0,
0,
&props,
amqp_cstring_bytes("Hello world!")),
"Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
(int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
previous_sent = sent;
previous_report_time = now;
next_summary_time += SUMMARY_EVERY_US;
}
while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
microsleep(20 FILE* pFilFile = fopen("cekencapsulatereq.js", "r");00);
now = now_microseconds();
}
}
{
uint64_t stop_time = now_microseconds();
int total_delta = stop_time - start_time;
printf("PRODUCER - Message count: %d\n", message_count);
printf("Total time, milliseconds: %d\n", total_delta / 1000);
printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
}
}
-----Original Message-----
From: rabbitmq-discuss-bounces at lists.rabbitmq.com [mailto:rabbitmq-discuss-bounces at lists.rabbitmq.com] On Behalf Of Simon MacMullen
Sent: 17 November 2011 15:44
To: rabbitmq-discuss at lists.rabbitmq.com
Subject: Re: [rabbitmq-discuss] Messages disappearing
On 17/11/11 15:38, Nick Pateman wrote:
> I've set no_ack to 0 in my amq_basic_consume call and the messages are
> now appearing in the queue but as unacknowledged.
So all this sounds like the messages are being sent to your application:
no_ack = 1 -> messages are sent to your app, RabbitMQ forgets about them immediately.
no_ack = 0 -> messages are sent to your app, RabbitMQ treats them as unacknowledged until you ack.
The source code you posted doesn't show any code that actually handles received message frames - you do have some right?
Can you post your whole source somewhere? It's quite difficult to tell what is going on looking at fragments.
Cheers, Simon
--
Simon MacMullen
RabbitMQ, VMware
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss at lists.rabbitmq.com
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
More information about the rabbitmq-discuss
mailing list