[rabbitmq-discuss] Synchronous publish with C client

Alexandru Scvorţov alexandru at rabbitmq.com
Fri Feb 11 00:35:00 GMT 2011


Hi,

What you're seeing is [almost] the correct behaviour.  Sending a
mandatory/immediate message to a nonexistent queue (or a queue with no
consumers) is not considered an error, so the commit will succeed.  In
this case, the broker will however respond with a basic.return message
informing the client of the lost message.

Unfortunately, I don't think our C client has anyway of handling
basic.returns, so you're out of luck.

The other clients are all capable of handling basic.returns (and
publisher confirms if you're using 2.3.1), so you might want to consider
using one of them.  Have a look at this blogpost, it covers what you're
trying to achieve:
  http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

Hope this helps.

Cheers,
Alex

On Thu, Feb 10, 2011 at 03:54:48PM -0700, Shane Head wrote:
> I know that publish is asynchronous, but from other threads I've read I thought that selecting the channel for a transaction, calling publish and then calling commit could be used to accomplish it.  I've tried doing this in my C code and have been unable to get it to work.  I'm using the amq.direct exchange and sending to a nonexistent queue using the mandatory and immediate flags.  It appears as though the message is being silently dropped by the server though.  I'm running rabbitmQ version 2.2.  Here is the code I'm using.  Please let me know of any errors there might be in my logic or code.  All I get is
> 
> [*] Sending some message
> 
> outputted when I run this.  I would expect to see the 'Error committingtx' print if the queue does not exist though.
> 
> int main(int argc, char *argv[])
> {
>     int rc;
>     int sockfd;
>     uint16_t channel = 1;
>     amqp_rpc_reply_t reply;
> 
>     amqp_connection_state_t conn = amqp_new_connection();
>     //Connect
>     sockfd = amqp_open_socket("localhost", 5672);
>     if (sockfd < 0)
>     {
>         printf("Error connecting to localhost:5672\n");
>         return 1;
>     }
>     amqp_set_sockfd(conn, sockfd);
>     amqp_login(conn, "/" /* vhost */, 0 /* channel_max */,
>                 131072 /* frame_max */, 0 /* heartbeat */,
>                 AMQP_SASL_METHOD_PLAIN, "guest", "guest");
> 
>     //Create a channel
>     amqp_channel_open(conn, channel);
> 
>     //Set up tx
>     amqp_tx_select(conn, channel);
>     reply = amqp_get_rpc_reply(conn);
>     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
>     {
>         printf("Error selecting tx\n");
>         return 1;
>     }
> 
>     //Send  message
>     const char *message = "some message";
>     printf("[*] Sending %s\n", message);
>     amqp_basic_properties_t props;
>     memset(&props, 0x00, sizeof(props));
>     props.delivery_mode = 2; //persistent
> 
>     rc = amqp_basic_publish(conn, channel, amqp_cstring_bytes("amq.direct"),
>                             amqp_cstring_bytes("nonexistent_queue"), 1 /*mandatory*/,
>                             1 /*immediate*/, &props, amqp_cstring_bytes(message));
>     if (rc != 0)
>     {
>         char *errstr = amqp_error_string(-rc);
>         printf("Error publishing: %s\n", errstr);
>         free(errstr);
>     }
> 
>     //Now send commit and see if it worked
>     amqp_tx_commit(conn, channel);
>     reply = amqp_get_rpc_reply(conn);
>     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
>     {
>         printf("Error committing tx\n");
>         return 1;
>     }
> 
>     reply = amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
>     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
>     {
>         printf("Error closing channel\n");
>     }
>     amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
>     amqp_destroy_connection(conn);
> 
>     return 0;
> }
> 
> Cheers,
> Shane
> 
> 
> 
> ***************
> This e-mail and any files transmitted with it may contain confidential and/or proprietary information. It is intended solely for the use of the individual or entity who is the intended recipient. Unauthorized use of this information is prohibited. If you have received this in error, please contact the sender by replying to this message and delete this material from any system it may be on.
> ***************

> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss



More information about the rabbitmq-discuss mailing list