[rabbitmq-discuss] pub-sub part 2: erlang consume
Joe Lee
codewalkerjoe at gmail.com
Thu Apr 10 00:17:25 BST 2008
I have published the message asynchronously to the exchange. Now to consume
the messages from the exchange, I can use either BasicGet or BasicConsume.
When you are consuming messages in pub/sub model, do you have to specify
what exchange to look for? In addition, when you consume message from an
exchange regardless of messaging model(pub/sub), you still have to declare a
queue, bind it and consume the message from the queue?
BasicConsume and BasicGet there is a variable named queue = Q. In pub-sub
case, how do you define null queue in Erlang-client? In basic.consume I set
the queue = <<"">> like below:
BasicConsume = #'basic.consume'{ticket = Ticket,
queue = <<"">>,
In basic.publish, I set immediate = false so that when the consumer started
later, message will be in the queue for it to consume.
Rabbitmq server throwing errors when I run the consumer code after the
message have been published to the exchange.
(rabbit at home)5> amqp_async_consume:amqp_lifecycle_consume().
Connection: {<0.178.0>,direct}
** exception exit: {{amqp_async_consume,amqp_lifecycle_consume,0},
{line,40},
match,
[{'queue.declare_ok',
<<"amq.q.gen1_rabbit at home_20080409224223_">>,0,0}]}
in function amqp_async_consume:amqp_lifecycle_consume/0
This is the line causing the problem:
amqp_channel:call(Channel,QueueDeclare) in
#'queue.declare_ok'{queue = Q,
message_count = MessageCount,
consumer_count = ConsumerCount}
= amqp_channel:call(Channel,QueueDeclare),
-module(amqp_async_consume).
-include_lib("rabbitmq_server/include/rabbit_framing.hrl").
-include_lib("rabbitmq_server/include/rabbit.hrl").
-include_lib("rabbitmq_client/include/amqp_client.hrl").
-export([amqp_lifecycle_consume/0]).
amqp_lifecycle_consume() ->
User = Password = "guest",
Realm = <<"/data">>,
%% Start a connection to the server
Connection = amqp_connection:start(User, Password),
io:format("Connection: ~p~n",[Connection]),
%% Once you have a connection to the server, you can start an AMQP
channel gain access to a realm
Channel = amqp_connection:open_channel(Connection),
Access = #'access.request'{realm = Realm,
exclusive = false,
passive = true,
active = true,
write = true,
read = true},
#'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel,
Access),
%% Register a consumer to listen to a queue
Q = <<"">>,
X = <<"x">>,
BindKey = <<"a.b.c.*">>,
QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
passive = false, durable = false,
exclusive = false, auto_delete = false,
nowait = false, arguments = []},
#'queue.declare_ok'{queue = Q,
message_count = MessageCount,
consumer_count = ConsumerCount}
= amqp_channel:call(Channel,QueueDeclare),
%ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = X,
type = <<"topic">>,
% passive = false, durable = false,
auto_delete = false, internal = false,
% nowait = false, arguments = []},
%#'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),
QueueBind = #'queue.bind'{ticket = Ticket, queue = Q, exchange = X,
routing_key = BindKey, nowait = false,
arguments = []},
#'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),
%ConsumerTag = <<"">>,
%DeliveryTag = <<"">>,
BasicConsume = #'basic.consume'{ticket = Ticket,
queue = Q,
consumer_tag = <<"">>,
no_local = false,
no_ack = true,
exclusive = false,
nowait = false},
#'basic.consume_ok'{consumer_tag = ConsumerTag}
= amqp_channel:call(Channel, BasicConsume, self()),
%% If the registration was sucessful, then consumer will be notified
receive
#'basic.consume_ok'{consumer_tag = ConsumerTag} -> ok
end,
%% When a message is routed to the queue, it will then be delivered to
this consumer
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, Content} ->
#content{payload_fragments_rev = [Payload]} = Content,
io:format("Message received: ~p~n", [Payload])
after 2000 ->
exit(did_not_receive_message)
end,
%% After the consumer is finished interacting with the queue, it can
deregister itself
BasicCancel = #'basic.cancel'{consumer_tag = ConsumerTag,
nowait = false},
#'basic.cancel_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel,BasicCancel).
Thank you,
Joe
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080409/5c96a12d/attachment.htm
More information about the rabbitmq-discuss
mailing list