[rabbitmq-discuss] pub-sub part 2: erlang consume

Joe Lee codewalkerjoe at gmail.com
Thu Apr 10 00:17:25 BST 2008


I have published the message asynchronously to the exchange.  Now to consume
the messages from the exchange, I can use either BasicGet or BasicConsume.
When you are consuming messages in pub/sub model, do you have to specify
what exchange to look for?  In addition, when you consume message from an
exchange regardless of messaging model(pub/sub), you still have to declare a
queue, bind it and consume the message from the queue?

BasicConsume and BasicGet there is a variable named queue = Q. In pub-sub
case, how do you define null queue in Erlang-client? In basic.consume I set
the queue = <<"">> like below:

BasicConsume = #'basic.consume'{ticket = Ticket,
                                    queue = <<"">>,

In basic.publish, I set immediate = false so that when the consumer started
later, message will be in the queue for it to consume.

Rabbitmq server throwing errors when I run the consumer code after the
message have been published to the exchange.

(rabbit at home)5> amqp_async_consume:amqp_lifecycle_consume().
Connection: {<0.178.0>,direct}
** exception exit: {{amqp_async_consume,amqp_lifecycle_consume,0},
                    {line,40},
                    match,
                    [{'queue.declare_ok',
                         <<"amq.q.gen1_rabbit at home_20080409224223_">>,0,0}]}
     in function  amqp_async_consume:amqp_lifecycle_consume/0


This is the line causing the problem:
amqp_channel:call(Channel,QueueDeclare) in
#'queue.declare_ok'{queue = Q,
                        message_count = MessageCount,
                        consumer_count = ConsumerCount}
                       = amqp_channel:call(Channel,QueueDeclare),


-module(amqp_async_consume).

-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
-include_lib("rabbitmq_server/include/rabbit.hrl").
-include_lib("rabbitmq_client/include/amqp_client.hrl").

-export([amqp_lifecycle_consume/0]).

amqp_lifecycle_consume() ->
    User = Password = "guest",
    Realm = <<"/data">>,

    %% Start a connection to the server

    Connection = amqp_connection:start(User, Password),
    io:format("Connection: ~p~n",[Connection]),
    %% Once you have a connection to the server, you can start an AMQP
channel gain access to a realm

    Channel = amqp_connection:open_channel(Connection),
    Access = #'access.request'{realm = Realm,
                               exclusive = false,
                               passive = true,
                               active = true,
                               write = true,
                               read = true},
    #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel,
Access),

    %% Register a consumer to listen to a queue
    Q = <<"">>,
    X = <<"x">>,
    BindKey = <<"a.b.c.*">>,

    QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
                                    passive = false, durable = false,
                                    exclusive = false, auto_delete = false,
                                    nowait = false, arguments = []},
    #'queue.declare_ok'{queue = Q,
                        message_count = MessageCount,
                        consumer_count = ConsumerCount}
                       = amqp_channel:call(Channel,QueueDeclare),


    %ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = X,
type = <<"topic">>,
    %                                      passive = false, durable = false,
auto_delete = false, internal = false,
    %                                      nowait = false, arguments = []},
    %#'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),

    QueueBind = #'queue.bind'{ticket = Ticket, queue = Q, exchange = X,
                              routing_key = BindKey, nowait = false,
arguments = []},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),



    %ConsumerTag = <<"">>,
    %DeliveryTag = <<"">>,
    BasicConsume = #'basic.consume'{ticket = Ticket,
                                    queue = Q,
                                    consumer_tag = <<"">>,
                                    no_local = false,
                                    no_ack = true,
                                    exclusive = false,
                                    nowait = false},
    #'basic.consume_ok'{consumer_tag = ConsumerTag}
        = amqp_channel:call(Channel, BasicConsume, self()),

    %% If the registration was sucessful, then consumer will be notified

    receive
        #'basic.consume_ok'{consumer_tag = ConsumerTag} -> ok
    end,

    %% When a message is routed to the queue, it will then be delivered to
this consumer

    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
            #content{payload_fragments_rev = [Payload]} = Content,
            io:format("Message received: ~p~n", [Payload])
    after 2000 ->
        exit(did_not_receive_message)
    end,

    %% After the consumer is finished interacting with the queue, it can
deregister itself

    BasicCancel = #'basic.cancel'{consumer_tag = ConsumerTag,
                                  nowait = false},
    #'basic.cancel_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel,BasicCancel).

Thank you,
Joe
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080409/5c96a12d/attachment.htm 


More information about the rabbitmq-discuss mailing list