[rabbitmq-discuss] sample postgresql C function using librabbitmq
Michael Nacos
m.nacos at gmail.com
Thu Nov 26 14:19:13 GMT 2009
We do a lot of db work involving postgresql and some of our new processes
use rabbitmq. We currently use an Erlang-based monitoring system for queues,
consumers and some
non-rabbit things. Since there is a C rabbitmq library these days (thanks
Tony), I have tried to achieve the same thing through C functions
interfacing directly with the AMQP broker,
removing the need for an intermediate layer when pushing/pulling from the
database. When I find a little more time, I am hoping to organize the most
useful rabbitmq-related functions
I have written into a postgresql module (probably github). Until then, here
is some toy code in the public domain:
regards, Michael
/*
** Filename: rabbit.c
*/
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
#include "commands/trigger.h"
#include "utils/builtins.h"
#include <ctype.h>
#include "amqp.h"
#include "amqp_framing.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
/*
HOW TO run this toy code (you need postgresql sources and librabbitmq.so
installed)
--------------------------------------------------------------------------------------
[assuming postgresql is installed in /usr/local/pgsql8.4.1]
make rabbit.so; mv rabbit.so /usr/local/pgsql8.4.1/lib
cat rabbit.sql | /usr/local/pgsql8.4.1/bin/psql -U postgres db_of_choice
echo "SELECT rabbit_count('localhost');" |
/usr/local/pgsql8.4.1/bin/psql -U postgres ...
# Makefile sample
--------------------------------------------------------------------
SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config
--includedir)
SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config
--includedir-server)
CFLAGS += -O3 $(SERVER_INCLUDES)
.SUFFIXES: .so
.c.so:
$(CC) $(CFLAGS) -fpic -c $<
$(CC) $(CFLAGS) -shared -lrabbitmq -o $@ $(basename $<).o
# rabbit.sql sample
------------------------------------------------------------------
CREATE OR REPLACE FUNCTION rabbit_count(hostname VARCHAR)
RETURNS INT AS 'rabbit.so', 'rabbit_count' LANGUAGE 'C';
#
------------------------------------------------------------------------------------
*/
/* toy postgresql C function rabbit_count()
accepts a single VARCHAR argument (hostname)
returns NULL if there is a connection error
or the number of consumers currently attached to a queue called
'PREDEFINED' */
PG_FUNCTION_INFO_V1(rabbit_count);
Datum rabbit_count(PG_FUNCTION_ARGS)
{
int ret = -1;
int port;
char const *queue;
char const *user; char const *pass;
int sockfd;
amqp_connection_state_t conn;
if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); }
VarChar *sql = PG_GETARG_VARCHAR_P(0); unsigned int size = VARSIZE(sql)
- VARHDRSZ;
char *hostname = (char *) palloc(size+1);
strncpy(hostname,VARDATA(sql),size); hostname[size] = '\0'; //
converting varchar into native string
port = 5672;
queue = "PREDEFINED";
user = "guest"; pass = "guest";
conn = amqp_new_connection();
if (conn < 0) {
elog(ERROR, "rabbit.so (rabbit_count) -- amqp_new_connection()
error");
return PointerGetDatum(NULL);
}
sockfd = amqp_open_socket(hostname, port);
if (sockfd < 0) {
elog(ERROR, "rabbit.so (rabbit_count) -- amqp_open_socket() error");
return PointerGetDatum(NULL);
}
amqp_set_sockfd(conn, sockfd);
// probably needs something like die_on_amqp_error -- see below
amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user,
pass);
// probably needs something like die_on_amqp_error -- see below
amqp_channel_open(conn, 1);
// die_on_amqp_error(amqp_rpc_reply, "Opening channel");
// after the queue argument, numbers correspond to bool vals for
passive, durable, exclusive, auto_delete
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,
amqp_cstring_bytes(queue), 0, 1, 0, 0, AMQP_EMPTY_TABLE);
// die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
// uint32_t message_count;
// uint32_t consumer_count;
ret = r->consumer_count;
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
close(sockfd);
pfree(hostname);
if ( ret == -1 ) {
elog(ERROR, "rabbit.so (rabbit_count) -- ret error");
return PointerGetDatum(NULL);
}
return (ret);
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20091126/db44fec4/attachment.htm
More information about the rabbitmq-discuss
mailing list