We do a lot of db work involving postgresql and some of our new processes use rabbitmq. We currently use an Erlang-based monitoring system for queues, consumers and some<br>non-rabbit things. Since there is a C rabbitmq library these days (thanks Tony), I have tried to achieve the same thing through C functions interfacing directly with the AMQP broker,<br>
removing the need for an intermediate layer when pushing/pulling from the database. When I find a little more time, I am hoping to organize the most useful rabbitmq-related functions<br>I have written into a postgresql module (probably github). Until then, here is some toy code in the public domain:<br>
<br>regards, Michael<br><br><div style="margin-left: 40px;">/*<br>** Filename: rabbit.c<br>*/<br><br>#include &quot;postgres.h&quot;<br>#include &quot;fmgr.h&quot;<br>#include &quot;funcapi.h&quot;<br>#include &quot;executor/spi.h&quot;<br>
#include &quot;nodes/execnodes.h&quot;<br>#include &quot;commands/trigger.h&quot;<br>#include &quot;utils/builtins.h&quot;<br>#include &lt;ctype.h&gt;<br>#include &quot;amqp.h&quot;<br>#include &quot;amqp_framing.h&quot;<br>
<br>#ifdef PG_MODULE_MAGIC<br>PG_MODULE_MAGIC;<br>#endif<br><br>/*<br>    HOW TO run this toy code (you need postgresql sources and librabbitmq.so installed)<br>    --------------------------------------------------------------------------------------<br>
    [assuming postgresql is installed in /usr/local/pgsql8.4.1]<br><br>    make rabbit.so; mv rabbit.so /usr/local/pgsql8.4.1/lib<br>    cat rabbit.sql | /usr/local/pgsql8.4.1/bin/psql -U postgres db_of_choice<br>    echo &quot;SELECT rabbit_count(&#39;localhost&#39;);&quot; | /usr/local/pgsql8.4.1/bin/psql -U postgres ...<br>
<br>    # Makefile sample --------------------------------------------------------------------<br><br>    SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config --includedir)<br>    SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config --includedir-server)<br>
    CFLAGS += -O3 $(SERVER_INCLUDES)<br>    .SUFFIXES:      .so<br>    .c.so:<br>        $(CC) $(CFLAGS) -fpic -c $&lt;<br>        $(CC) $(CFLAGS) -shared -lrabbitmq -o $@ $(basename $&lt;).o<br><br>    # rabbit.sql sample ------------------------------------------------------------------<br>
<br>        CREATE OR REPLACE FUNCTION rabbit_count(hostname VARCHAR)<br>            RETURNS INT AS &#39;rabbit.so&#39;, &#39;rabbit_count&#39; LANGUAGE &#39;C&#39;;<br><br>    # ------------------------------------------------------------------------------------<br>
*/<br><br>/* toy postgresql C function rabbit_count()<br>      accepts a single VARCHAR argument (hostname)<br>      returns NULL if there is a connection error<br>      or the number of consumers currently attached to a queue called &#39;PREDEFINED&#39;      */<br>
<br>PG_FUNCTION_INFO_V1(rabbit_count);<br>Datum rabbit_count(PG_FUNCTION_ARGS)<br>{<br>    int ret = -1;<br>    int port;<br>    char const *queue;<br>    char const *user; char const *pass;<br>      int sockfd;<br>      amqp_connection_state_t conn;<br>
<br>    if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); } <br>    VarChar *sql = PG_GETARG_VARCHAR_P(0); unsigned int size = VARSIZE(sql) - VARHDRSZ;<br>    char *hostname = (char *) palloc(size+1);<br>    strncpy(hostname,VARDATA(sql),size); hostname[size] = &#39;\0&#39;; // converting varchar into native string<br>
    <br>    port = 5672;<br>    queue = &quot;PREDEFINED&quot;;<br>    user = &quot;guest&quot;; pass = &quot;guest&quot;;<br><br>      conn = amqp_new_connection();<br>    if (conn &lt; 0) {<br>        elog(ERROR, &quot;rabbit.so (rabbit_count) -- amqp_new_connection() error&quot;);<br>
        return PointerGetDatum(NULL);<br>    }<br>    <br>      sockfd = amqp_open_socket(hostname, port);<br>      if (sockfd &lt; 0) {<br>        elog(ERROR, &quot;rabbit.so (rabbit_count) -- amqp_open_socket() error&quot;);<br>
        return PointerGetDatum(NULL);<br>    }<br>    amqp_set_sockfd(conn, sockfd);<br><br>    // probably needs something like die_on_amqp_error -- see below<br>      amqp_login(conn, &quot;/&quot;, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, pass);<br>
    // probably needs something like die_on_amqp_error -- see below<br>    amqp_channel_open(conn, 1);<br>    // die_on_amqp_error(amqp_rpc_reply, &quot;Opening channel&quot;);<br><br>    // after the queue argument, numbers correspond to bool vals for passive, durable, exclusive, auto_delete<br>
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue), 0, 1, 0, 0, AMQP_EMPTY_TABLE);<br>    // die_on_amqp_error(amqp_rpc_reply, &quot;Declaring queue&quot;);<br><br>    //   uint32_t message_count;<br>
    //   uint32_t consumer_count;<br>    ret = r-&gt;consumer_count;<br><br>    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);<br>    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);<br>    amqp_destroy_connection(conn);<br>
    close(sockfd);<br><br>    pfree(hostname);<br><br>    if ( ret == -1 ) {<br>        elog(ERROR, &quot;rabbit.so (rabbit_count) -- ret error&quot;);<br>        return PointerGetDatum(NULL);<br>    }<br><br>    return (ret);<br>
}<br></div><br>