We do a lot of db work involving postgresql and some of our new processes use rabbitmq. We currently use an Erlang-based monitoring system for queues, consumers and some<br>non-rabbit things. Since there is a C rabbitmq library these days (thanks Tony), I have tried to achieve the same thing through C functions interfacing directly with the AMQP broker,<br>
removing the need for an intermediate layer when pushing/pulling from the database. When I find a little more time, I am hoping to organize the most useful rabbitmq-related functions<br>I have written into a postgresql module (probably github). Until then, here is some toy code in the public domain:<br>
<br>regards, Michael<br><br><div style="margin-left: 40px;">/*<br>** Filename: rabbit.c<br>*/<br><br>#include "postgres.h"<br>#include "fmgr.h"<br>#include "funcapi.h"<br>#include "executor/spi.h"<br>
#include "nodes/execnodes.h"<br>#include "commands/trigger.h"<br>#include "utils/builtins.h"<br>#include <ctype.h><br>#include "amqp.h"<br>#include "amqp_framing.h"<br>
<br>#ifdef PG_MODULE_MAGIC<br>PG_MODULE_MAGIC;<br>#endif<br><br>/*<br>    HOW TO run this toy code (you need postgresql sources and librabbitmq.so installed)<br>    --------------------------------------------------------------------------------------<br>
    [assuming postgresql is installed in /usr/local/pgsql8.4.1]<br><br>    make rabbit.so; mv rabbit.so /usr/local/pgsql8.4.1/lib<br>    cat rabbit.sql | /usr/local/pgsql8.4.1/bin/psql -U postgres db_of_choice<br>    echo "SELECT rabbit_count('localhost');" | /usr/local/pgsql8.4.1/bin/psql -U postgres ...<br>
<br>    # Makefile sample --------------------------------------------------------------------<br><br>    SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config --includedir)<br>    SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config --includedir-server)<br>
    CFLAGS += -O3 $(SERVER_INCLUDES)<br>    .SUFFIXES:      .so<br>    .c.so:<br>        $(CC) $(CFLAGS) -fpic -c $<<br>        $(CC) $(CFLAGS) -shared -lrabbitmq -o $@ $(basename $<).o<br><br>    # rabbit.sql sample ------------------------------------------------------------------<br>
<br>        CREATE OR REPLACE FUNCTION rabbit_count(hostname VARCHAR)<br>            RETURNS INT AS 'rabbit.so', 'rabbit_count' LANGUAGE 'C';<br><br>    # ------------------------------------------------------------------------------------<br>
*/<br><br>/* toy postgresql C function rabbit_count()<br>      accepts a single VARCHAR argument (hostname)<br>      returns NULL if there is a connection error<br>      or the number of consumers currently attached to a queue called 'PREDEFINED'      */<br>
<br>PG_FUNCTION_INFO_V1(rabbit_count);<br>Datum rabbit_count(PG_FUNCTION_ARGS)<br>{<br>    int ret = -1;<br>    int port;<br>    char const *queue;<br>    char const *user; char const *pass;<br>      int sockfd;<br>      amqp_connection_state_t conn;<br>
<br>    if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); } <br>    VarChar *sql = PG_GETARG_VARCHAR_P(0); unsigned int size = VARSIZE(sql) - VARHDRSZ;<br>    char *hostname = (char *) palloc(size+1);<br>    strncpy(hostname,VARDATA(sql),size); hostname[size] = '\0'; // converting varchar into native string<br>
    <br>    port = 5672;<br>    queue = "PREDEFINED";<br>    user = "guest"; pass = "guest";<br><br>      conn = amqp_new_connection();<br>    if (conn < 0) {<br>        elog(ERROR, "rabbit.so (rabbit_count) -- amqp_new_connection() error");<br>
        return PointerGetDatum(NULL);<br>    }<br>    <br>      sockfd = amqp_open_socket(hostname, port);<br>      if (sockfd < 0) {<br>        elog(ERROR, "rabbit.so (rabbit_count) -- amqp_open_socket() error");<br>
        return PointerGetDatum(NULL);<br>    }<br>    amqp_set_sockfd(conn, sockfd);<br><br>    // probably needs something like die_on_amqp_error -- see below<br>      amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, pass);<br>
    // probably needs something like die_on_amqp_error -- see below<br>    amqp_channel_open(conn, 1);<br>    // die_on_amqp_error(amqp_rpc_reply, "Opening channel");<br><br>    // after the queue argument, numbers correspond to bool vals for passive, durable, exclusive, auto_delete<br>
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue), 0, 1, 0, 0, AMQP_EMPTY_TABLE);<br>    // die_on_amqp_error(amqp_rpc_reply, "Declaring queue");<br><br>    //   uint32_t message_count;<br>
    //   uint32_t consumer_count;<br>    ret = r->consumer_count;<br><br>    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);<br>    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);<br>    amqp_destroy_connection(conn);<br>
    close(sockfd);<br><br>    pfree(hostname);<br><br>    if ( ret == -1 ) {<br>        elog(ERROR, "rabbit.so (rabbit_count) -- ret error");<br>        return PointerGetDatum(NULL);<br>    }<br><br>    return (ret);<br>
}<br></div><br>