I have published the message asynchronously to the exchange. Now to consume the messages from the exchange, I can use either BasicGet or BasicConsume. When you are consuming messages in pub/sub model, do you have to specify what exchange to look for? In addition, when you consume message from an exchange regardless of messaging model(pub/sub), you still have to declare a queue, bind it and consume the message from the queue?<br>
<br>BasicConsume and BasicGet there is a variable named queue = Q. In pub-sub case, how do you define null queue in Erlang-client? In basic.consume I set the queue = <<"">> like below:<br><br>BasicConsume = #'basic.consume'{ticket = Ticket,<br>
queue = <<"">>,<br><br>In basic.publish, I set immediate = false so that when the consumer started later, message will be in the queue for it to consume.<br>
<br>Rabbitmq server throwing errors when I run the consumer code after the message have been published to the exchange.<br><br>(rabbit@home)5> amqp_async_consume:amqp_lifecycle_consume(). <br>Connection: {<0.178.0>,direct}<br>
** exception exit: {{amqp_async_consume,amqp_lifecycle_consume,0},<br> {line,40},<br> match,<br> [{'queue.declare_ok',<br> <<"amq.q.gen1_rabbit@home_20080409224223_">>,0,0}]}<br>
in function amqp_async_consume:amqp_lifecycle_consume/0<br><br><br>This is the line causing the problem: amqp_channel:call(Channel,QueueDeclare) in<br>#'queue.declare_ok'{queue = Q,<br> message_count = MessageCount,<br>
consumer_count = ConsumerCount}<br> = amqp_channel:call(Channel,QueueDeclare),<br><br><br>-module(amqp_async_consume).<br><br>-include_lib("rabbitmq_server/include/rabbit_framing.hrl").<br>
-include_lib("rabbitmq_server/include/rabbit.hrl").<br>-include_lib("rabbitmq_client/include/amqp_client.hrl").<br><br>-export([amqp_lifecycle_consume/0]).<br><br>amqp_lifecycle_consume() -><br> User = Password = "guest",<br>
Realm = <<"/data">>,<br><br> %% Start a connection to the server<br><br> Connection = amqp_connection:start(User, Password),<br> io:format("Connection: ~p~n",[Connection]),<br>
%% Once you have a connection to the server, you can start an AMQP channel gain access to a realm<br><br> Channel = amqp_connection:open_channel(Connection),<br> Access = #'access.request'{realm = Realm,<br>
exclusive = false,<br> passive = true,<br> active = true,<br> write = true,<br> read = true},<br>
#'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel, Access),<br><br> %% Register a consumer to listen to a queue<br> Q = <<"">>, <br> X = <<"x">>,<br>
BindKey = <<"a.b.c.*">>,<br> <br> QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,<br> passive = false, durable = false,<br> exclusive = false, auto_delete = false,<br>
nowait = false, arguments = []},<br> #'queue.declare_ok'{queue = Q,<br> message_count = MessageCount,<br> consumer_count = ConsumerCount}<br>
= amqp_channel:call(Channel,QueueDeclare),<br> <br> <br> %ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = X, type = <<"topic">>,<br> % passive = false, durable = false, auto_delete = false, internal = false,<br>
% nowait = false, arguments = []},<br> %#'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),<br> <br> QueueBind = #'queue.bind'{ticket = Ticket, queue = Q, exchange = X,<br>
routing_key = BindKey, nowait = false, arguments = []},<br> #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),<br><br> <br> <br> %ConsumerTag = <<"">>,<br>
%DeliveryTag = <<"">>,<br> BasicConsume = #'basic.consume'{ticket = Ticket,<br> queue = Q,<br> consumer_tag = <<"">>,<br>
no_local = false,<br> no_ack = true,<br> exclusive = false,<br> nowait = false},<br>
#'basic.consume_ok'{consumer_tag = ConsumerTag} <br> = amqp_channel:call(Channel, BasicConsume, self()),<br><br> %% If the registration was sucessful, then consumer will be notified<br><br> receive<br>
#'basic.consume_ok'{consumer_tag = ConsumerTag} -> ok<br> end,<br><br> %% When a message is routed to the queue, it will then be delivered to this consumer<br><br> receive<br> {#'basic.deliver'{delivery_tag = DeliveryTag}, Content} -><br>
#content{payload_fragments_rev = [Payload]} = Content,<br> io:format("Message received: ~p~n", [Payload])<br> after 2000 -><br> exit(did_not_receive_message)<br> end,<br><br>
%% After the consumer is finished interacting with the queue, it can deregister itself<br><br> BasicCancel = #'basic.cancel'{consumer_tag = ConsumerTag,<br> nowait = false},<br>
#'basic.cancel_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel,BasicCancel).<br><br>Thank you,<br>Joe<br>