[rabbitmq-discuss] Spying on "mandatory" messages
Charly Hamy
charly.hamy at gmail.com
Fri Mar 25 17:36:53 GMT 2011
Matthew,
> This is very impressive for a first hack on Rabbit.
Thanks, I'm flattered! :)
I tried to follow your comments to improve the code...
> I would imagine you're missing a diff here: surely the handle_call in
> amqqueue_process for the mandatory delivery that should be returning
> true|spy?
You're right! I forgot to post that piece, here to everything's not working
that well...
My final list of changes :
*
rabbit.hrl :
(edition of the amqqueue record, line 47)
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
spy=false,
arguments, pid}).
rabbit_amqqueue.erl :
(line 115, spec of deliver/2 edited )
-spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()|'spy').
(edition of rabbit_amqqueue:deliver/2)
deliver(QPid, Delivery = #delivery{immediate = true}) ->
gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
deliver(QPid, Delivery = #delivery{mandatory = true}) ->
case gen_server2:call(QPid, {deliver, Delivery}, infinity) of
spy -> spy;
_ -> true
end;
deliver(QPid, Delivery) ->
gen_server2:cast(QPid, {deliver, Delivery}),
true.
rabbit_amqqueue_process.erl :
(edition of rabbit_amqqueue_process:declare/3)
declare(Recover, From,
State = #q{q
= Qparam = #amqqueue{
name = QName,
durable = IsDurable,
arguments = Args
},
backing_queue = BQ,
backing_queue_state = undefined,
stats_timer = StatsTimer
}) ->
Spy = case rabbit_misc:table_lookup(Args, <<"spy">>) of
{'bool', true} -> true;
_ -> false
end,
Q = Qparam#amqqueue{ spy = Spy},
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use,
[self()]),
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = bq_init(BQ, QName, IsDurable, Recover),
State1 = process_args(State#q{backing_queue_state =
BQS, q=Q}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS,
State1)),
rabbit_event:if_enabled(StatsTimer,
fun() -> emit_stats(State1)
end),
noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
end.
( edition of rabbit_amqqueue_procces:handle_call({deliver, _} ..., line 846
)
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
case (State#q.q)#amqqueue.spy of
true -> gen_server2:reply(From, spy);
false -> gen_server2:reply(From, true)
end,
noreply(deliver_or_enqueue(Delivery, State));
rabbit_amqqueue_router.erl :
(edition of rabbit_router:fold_deliveries/2, line 105)
fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
fold_deliveries({_, spy}, {false, Handled}) -> {false, Handled};
fold_deliveries({_, _},{_, Handled}) -> {true, Handled}.
*
I encountered no problem for the moment, testing that feature with the java
API.
On Fri, Mar 25, 2011 at 4:04 PM, Matthew Sackman <matthew at rabbitmq.com>wrote:
> On Fri, Mar 25, 2011 at 02:58:43PM +0000, Matthew Sackman wrote:
> > I'd probably bring that out to a Spy variable. Also, take a look at
> > rabbit_misc:table_lookup/2. Also, you might want to delay that
> > inspection until you're in amqqueue_process:delay
>
> Erm, I meant amqqueue_process:declare there. Fingers don't work...
>
> Matthew
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110325/1c04f739/attachment-0001.htm>
More information about the rabbitmq-discuss
mailing list