[rabbitmq-discuss] sample postgresql C function using librabbitmq

Michael Nacos m.nacos at gmail.com
Thu Nov 26 14:19:13 GMT 2009


We do a lot of db work involving postgresql and some of our new processes
use rabbitmq. We currently use an Erlang-based monitoring system for queues,
consumers and some
non-rabbit things. Since there is a C rabbitmq library these days (thanks
Tony), I have tried to achieve the same thing through C functions
interfacing directly with the AMQP broker,
removing the need for an intermediate layer when pushing/pulling from the
database. When I find a little more time, I am hoping to organize the most
useful rabbitmq-related functions
I have written into a postgresql module (probably github). Until then, here
is some toy code in the public domain:

regards, Michael

/*
** Filename: rabbit.c
*/

#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
#include "commands/trigger.h"
#include "utils/builtins.h"
#include <ctype.h>
#include "amqp.h"
#include "amqp_framing.h"

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

/*
    HOW TO run this toy code (you need postgresql sources and librabbitmq.so
installed)

--------------------------------------------------------------------------------------
    [assuming postgresql is installed in /usr/local/pgsql8.4.1]

    make rabbit.so; mv rabbit.so /usr/local/pgsql8.4.1/lib
    cat rabbit.sql | /usr/local/pgsql8.4.1/bin/psql -U postgres db_of_choice
    echo "SELECT rabbit_count('localhost');" |
/usr/local/pgsql8.4.1/bin/psql -U postgres ...

    # Makefile sample
--------------------------------------------------------------------

    SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config
--includedir)
    SERVER_INCLUDES += -I $(shell /usr/local/pgsql8.4.1/bin/pg_config
--includedir-server)
    CFLAGS += -O3 $(SERVER_INCLUDES)
    .SUFFIXES:      .so
    .c.so:
        $(CC) $(CFLAGS) -fpic -c $<
        $(CC) $(CFLAGS) -shared -lrabbitmq -o $@ $(basename $<).o

    # rabbit.sql sample
------------------------------------------------------------------

        CREATE OR REPLACE FUNCTION rabbit_count(hostname VARCHAR)
            RETURNS INT AS 'rabbit.so', 'rabbit_count' LANGUAGE 'C';

    #
------------------------------------------------------------------------------------
*/

/* toy postgresql C function rabbit_count()
      accepts a single VARCHAR argument (hostname)
      returns NULL if there is a connection error
      or the number of consumers currently attached to a queue called
'PREDEFINED'      */

PG_FUNCTION_INFO_V1(rabbit_count);
Datum rabbit_count(PG_FUNCTION_ARGS)
{
    int ret = -1;
    int port;
    char const *queue;
    char const *user; char const *pass;
      int sockfd;
      amqp_connection_state_t conn;

    if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); }
    VarChar *sql = PG_GETARG_VARCHAR_P(0); unsigned int size = VARSIZE(sql)
- VARHDRSZ;
    char *hostname = (char *) palloc(size+1);
    strncpy(hostname,VARDATA(sql),size); hostname[size] = '\0'; //
converting varchar into native string

    port = 5672;
    queue = "PREDEFINED";
    user = "guest"; pass = "guest";

      conn = amqp_new_connection();
    if (conn < 0) {
        elog(ERROR, "rabbit.so (rabbit_count) -- amqp_new_connection()
error");
        return PointerGetDatum(NULL);
    }

      sockfd = amqp_open_socket(hostname, port);
      if (sockfd < 0) {
        elog(ERROR, "rabbit.so (rabbit_count) -- amqp_open_socket() error");
        return PointerGetDatum(NULL);
    }
    amqp_set_sockfd(conn, sockfd);

    // probably needs something like die_on_amqp_error -- see below
      amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user,
pass);
    // probably needs something like die_on_amqp_error -- see below
    amqp_channel_open(conn, 1);
    // die_on_amqp_error(amqp_rpc_reply, "Opening channel");

    // after the queue argument, numbers correspond to bool vals for
passive, durable, exclusive, auto_delete
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1,
amqp_cstring_bytes(queue), 0, 1, 0, 0, AMQP_EMPTY_TABLE);
    // die_on_amqp_error(amqp_rpc_reply, "Declaring queue");

    //   uint32_t message_count;
    //   uint32_t consumer_count;
    ret = r->consumer_count;

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    close(sockfd);

    pfree(hostname);

    if ( ret == -1 ) {
        elog(ERROR, "rabbit.so (rabbit_count) -- ret error");
        return PointerGetDatum(NULL);
    }

    return (ret);
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20091126/db44fec4/attachment.htm 


More information about the rabbitmq-discuss mailing list