We do a lot of db work involving postgresql and some of our new processes use rabbitmq. We currently use an Erlang-based monitoring system for queues, consumers and some<br>non-rabbit things. Since there is a C rabbitmq library these days (thanks Tony), I have tried to achieve the same thing through C functions interfacing directly with the AMQP broker,<br>
removing the need for an intermediate layer when pushing/pulling from the database. When I find a little more time, I am hoping to organize the most useful rabbitmq-related functions<br>I have written into a postgresql module (probably github). Until then, here is some toy code in the public domain:<br>
<br>regards, Michael<br><br><div style="margin-left: 40px;">/*<br>** Filename: rabbit.c<br>*/<br><br>#include "postgres.h"<br>#include "fmgr.h"<br>#include "funcapi.h"<br>#include "executor/spi.h"<br>
#include "nodes/execnodes.h"<br>#include "commands/trigger.h"<br>#include "utils/builtins.h"<br>#include <ctype.h><br>#include "amqp.h"<br>#include "amqp_framing.h"<br>
<br>#ifdef PG_MODULE_MAGIC<br>PG_MODULE_MAGIC;<br>#endif<br><br>/*<br> HOW TO run this toy code (you need postgresql sources and librabbitmq.so installed)<br> --------------------------------------------------------------------------------------<br>
[assuming postgresql is installed in /usr/local/pgsql8.4.1]<br><br> make rabbit.so; mv rabbit.so /usr/local/pgsql8.4.1/lib<br> cat rabbit.sql | /usr/local/pgsql8.4.1/bin/psql -U postgres db_of_choice<br> echo "SELECT rabbit_count('localhost');" | /usr/local/pgsql8.4.1/bin/psql -U postgres ...<br>
<br> # Makefile sample --------------------------------------------------------------------<br><br> SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config --includedir)<br> SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config --includedir-server)<br>
CFLAGS += -O3 $(SERVER_INCLUDES)<br> .SUFFIXES: .so<br> .c.so:<br> $(CC) $(CFLAGS) -fpic -c $<<br> $(CC) $(CFLAGS) -shared -lrabbitmq -o $@ $(basename $<).o<br><br> # rabbit.sql sample ------------------------------------------------------------------<br>
<br> CREATE OR REPLACE FUNCTION rabbit_count(hostname VARCHAR)<br> RETURNS INT AS 'rabbit.so', 'rabbit_count' LANGUAGE 'C';<br><br> # ------------------------------------------------------------------------------------<br>
*/<br><br>/* toy postgresql C function rabbit_count()<br> accepts a single VARCHAR argument (hostname)<br> returns NULL if there is a connection error<br> or the number of consumers currently attached to a queue called 'PREDEFINED' */<br>
<br>PG_FUNCTION_INFO_V1(rabbit_count);<br>Datum rabbit_count(PG_FUNCTION_ARGS)<br>{<br> int ret = -1;<br> int port;<br> char const *queue;<br> char const *user; char const *pass;<br> int sockfd;<br> amqp_connection_state_t conn;<br>
<br> if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); } <br> VarChar *sql = PG_GETARG_VARCHAR_P(0); unsigned int size = VARSIZE(sql) - VARHDRSZ;<br> char *hostname = (char *) palloc(size+1);<br> strncpy(hostname,VARDATA(sql),size); hostname[size] = '\0'; // converting varchar into native string<br>
<br> port = 5672;<br> queue = "PREDEFINED";<br> user = "guest"; pass = "guest";<br><br> conn = amqp_new_connection();<br> if (conn < 0) {<br> elog(ERROR, "rabbit.so (rabbit_count) -- amqp_new_connection() error");<br>
return PointerGetDatum(NULL);<br> }<br> <br> sockfd = amqp_open_socket(hostname, port);<br> if (sockfd < 0) {<br> elog(ERROR, "rabbit.so (rabbit_count) -- amqp_open_socket() error");<br>
return PointerGetDatum(NULL);<br> }<br> amqp_set_sockfd(conn, sockfd);<br><br> // probably needs something like die_on_amqp_error -- see below<br> amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, pass);<br>
// probably needs something like die_on_amqp_error -- see below<br> amqp_channel_open(conn, 1);<br> // die_on_amqp_error(amqp_rpc_reply, "Opening channel");<br><br> // after the queue argument, numbers correspond to bool vals for passive, durable, exclusive, auto_delete<br>
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue), 0, 1, 0, 0, AMQP_EMPTY_TABLE);<br> // die_on_amqp_error(amqp_rpc_reply, "Declaring queue");<br><br> // uint32_t message_count;<br>
// uint32_t consumer_count;<br> ret = r->consumer_count;<br><br> amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);<br> amqp_connection_close(conn, AMQP_REPLY_SUCCESS);<br> amqp_destroy_connection(conn);<br>
close(sockfd);<br><br> pfree(hostname);<br><br> if ( ret == -1 ) {<br> elog(ERROR, "rabbit.so (rabbit_count) -- ret error");<br> return PointerGetDatum(NULL);<br> }<br><br> return (ret);<br>
}<br></div><br>