[rabbitmq-discuss] Spying on "mandatory" messages

Charly Hamy charly.hamy at gmail.com
Fri Mar 25 17:36:53 GMT 2011


Matthew,


> This is very impressive for a first hack on Rabbit.
Thanks, I'm flattered! :)
I tried to follow your comments to improve the code...


> I would imagine you're missing a diff here: surely the handle_call in
> amqqueue_process for the mandatory delivery that should be returning
> true|spy?

You're right! I forgot to post that piece, here to everything's not working
that well...


My final list of changes :

*
rabbit.hrl :
(edition of the amqqueue record, line 47)

     -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
spy=false,
                   arguments, pid}).


rabbit_amqqueue.erl :

(line 115, spec of deliver/2 edited )

 -spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()|'spy').


(edition of rabbit_amqqueue:deliver/2)

     deliver(QPid, Delivery = #delivery{immediate = true}) ->
          gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
     deliver(QPid, Delivery = #delivery{mandatory = true}) ->
          case gen_server2:call(QPid, {deliver, Delivery}, infinity) of
               spy -> spy;
               _ -> true
          end;
     deliver(QPid, Delivery) ->
          gen_server2:cast(QPid, {deliver, Delivery}),
          true.

rabbit_amqqueue_process.erl :

(edition of rabbit_amqqueue_process:declare/3)

declare(Recover, From,
            State = #q{q
 = Qparam = #amqqueue{
name = QName,
durable = IsDurable,
arguments = Args
},
            backing_queue = BQ,
backing_queue_state = undefined,
            stats_timer = StatsTimer
}) ->
    Spy = case rabbit_misc:table_lookup(Args, <<"spy">>) of
{'bool', true} -> true;
             _ -> false
    end,
    Q = Qparam#amqqueue{ spy = Spy},
    case rabbit_amqqueue:internal_declare(Q, Recover) of
        not_found -> {stop, normal, not_found, State};
        Q         -> gen_server2:reply(From, {new, Q}),
                     ok = file_handle_cache:register_callback(
                            rabbit_amqqueue, set_maximum_since_use,
                            [self()]),
                     ok = rabbit_memory_monitor:register(
                            self(), {rabbit_amqqueue,
                                     set_ram_duration_target, [self()]}),
                     BQS = bq_init(BQ, QName, IsDurable, Recover),
                     State1 = process_args(State#q{backing_queue_state =
BQS, q=Q}),
                     rabbit_event:notify(queue_created,
                                         infos(?CREATION_EVENT_KEYS,
State1)),
                     rabbit_event:if_enabled(StatsTimer,
                                             fun() -> emit_stats(State1)
end),
                     noreply(State1);
        Q1        -> {stop, normal, {existing, Q1}, State}
    end.


( edition of rabbit_amqqueue_procces:handle_call({deliver, _} ..., line 846
)

handle_call({deliver, Delivery}, From, State) ->
    %% Synchronous, "mandatory" delivery mode. Reply asap.
    case (State#q.q)#amqqueue.spy of
true  -> gen_server2:reply(From, spy);
false -> gen_server2:reply(From, true)
    end,
    noreply(deliver_or_enqueue(Delivery, State));


rabbit_amqqueue_router.erl :
(edition of rabbit_router:fold_deliveries/2, line 105)

     fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
     fold_deliveries({_,  spy}, {false, Handled}) -> {false, Handled};
     fold_deliveries({_,  _},{_, Handled}) -> {true, Handled}.

*

I encountered no problem for the moment, testing that feature with the java
API.


On Fri, Mar 25, 2011 at 4:04 PM, Matthew Sackman <matthew at rabbitmq.com>wrote:

> On Fri, Mar 25, 2011 at 02:58:43PM +0000, Matthew Sackman wrote:
> > I'd probably bring that out to a Spy variable. Also, take a look at
> > rabbit_misc:table_lookup/2. Also, you might want to delay that
> > inspection until you're in amqqueue_process:delay
>
> Erm, I meant amqqueue_process:declare there. Fingers don't work...
>
> Matthew
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110325/1c04f739/attachment-0001.htm>


More information about the rabbitmq-discuss mailing list