[rabbitmq-discuss] Synchronous publish with C client

Shane Head shead at galileoprocessing.com
Thu Feb 10 22:54:48 GMT 2011


I know that publish is asynchronous, but from other threads I've read I thought that selecting the channel for a transaction, calling publish and then calling commit could be used to accomplish it.  I've tried doing this in my C code and have been unable to get it to work.  I'm using the amq.direct exchange and sending to a nonexistent queue using the mandatory and immediate flags.  It appears as though the message is being silently dropped by the server though.  I'm running rabbitmQ version 2.2.  Here is the code I'm using.  Please let me know of any errors there might be in my logic or code.  All I get is

[*] Sending some message

outputted when I run this.  I would expect to see the 'Error committingtx' print if the queue does not exist though.

int main(int argc, char *argv[])
{
    int rc;
    int sockfd;
    uint16_t channel = 1;
    amqp_rpc_reply_t reply;

    amqp_connection_state_t conn = amqp_new_connection();
    //Connect
    sockfd = amqp_open_socket("localhost", 5672);
    if (sockfd < 0)
    {
        printf("Error connecting to localhost:5672\n");
        return 1;
    }
    amqp_set_sockfd(conn, sockfd);
    amqp_login(conn, "/" /* vhost */, 0 /* channel_max */,
                131072 /* frame_max */, 0 /* heartbeat */,
                AMQP_SASL_METHOD_PLAIN, "guest", "guest");

    //Create a channel
    amqp_channel_open(conn, channel);

    //Set up tx
    amqp_tx_select(conn, channel);
    reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        printf("Error selecting tx\n");
        return 1;
    }

    //Send  message
    const char *message = "some message";
    printf("[*] Sending %s\n", message);
    amqp_basic_properties_t props;
    memset(&props, 0x00, sizeof(props));
    props.delivery_mode = 2; //persistent

    rc = amqp_basic_publish(conn, channel, amqp_cstring_bytes("amq.direct"),
                            amqp_cstring_bytes("nonexistent_queue"), 1 /*mandatory*/,
                            1 /*immediate*/, &props, amqp_cstring_bytes(message));
    if (rc != 0)
    {
        char *errstr = amqp_error_string(-rc);
        printf("Error publishing: %s\n", errstr);
        free(errstr);
    }

    //Now send commit and see if it worked
    amqp_tx_commit(conn, channel);
    reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        printf("Error committing tx\n");
        return 1;
    }

    reply = amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
    {
        printf("Error closing channel\n");
    }
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

    return 0;
}

Cheers,
Shane



***************
This e-mail and any files transmitted with it may contain confidential and/or proprietary information. It is intended solely for the use of the individual or entity who is the intended recipient. Unauthorized use of this information is prohibited. If you have received this in error, please contact the sender by replying to this message and delete this material from any system it may be on.
***************
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110210/8f10f07a/attachment.htm>


More information about the rabbitmq-discuss mailing list