[rabbitmq-discuss] Flow Control

Ben Hood 0x6e6562 at gmail.com
Mon Nov 10 17:36:22 GMT 2008


On Mon, Nov 10, 2008 at 4:18 PM, Edwin Fine
<rabbitmq-discuss_efine at usa.net> wrote:
>> Also, even if you use a call, how will you know that the message
>> hasn't been binned by the gen_server instance that you invoked
>> quasi-synchronously?
> Because the call will return an ok?

Sure, but all that means is that the gen_server instance has received
your message and said ok, it doesn't mean that the broker has accepted
responsibility for it.

> Application level acknowldgement, meaning that my consumer explicitly sends
> an application-level acknowledgement message to the producer? Let me
> hopefully not annoy you by again comparing with WebSphere MQ. When you PUT a
> message to the queue manager, you get a return code which, if it is a
> success, means that the queue manager has got your message and it's all
> good. No app level acks needed. No transactions needed. With using a cast to
> send a message to the broker, if the client has a problem with the message
> (this happened to me recently), the producer has no idea of this. I only
> found out by seeing errors in my log file. Or at least, I don't know of a
> way to find out other than by starting a transaction, which I am loath to do
> because of performance concerns.

Ok, there are few separate issues here:

1) I said that in general app level acknowledgements are the best
thing to have in terms of responsibility transfer - but they are not
always practical or possible (e.g. fire and forget semantics) and they
not strictly necessary;

2) You are describing responsibility transfer in terms of MQ series -
the comparison to make would between sending messages on a per-message
ack basis to MQ series and wrapping a whole of AMQP basic.publish
commands in a TX - and measure the different

3) Are you aware of what overhead using AMQP transactions has in your
particular use case? Remember that the semantics of a TX in AMQP is
not necessarily identical to that of MQ series.

4) I agree that using a cast will swallow bugs whereas a call will be
reported inline to the invocation. However, cast is lot cheaper than
call (weren't you talking about TX's being expensive?)

> What happened is that I refactored some code and in doing so created a bug
> that tried to send a record instead of a list in the basic.publish. Yes, my
> fault, but I would have liked to have known about it at the code level. The
> message was lost because there apparently was no way to detect this, let
> alone recover from it (except maybe the txn, which I have not tried).
> == 8-Nov-2008::20:08:59 == ERROR -  emulator
> Error in process <0.191.0> on node 'xhg_rel at ender' with exit value:
> {badarg,[{erlang,list_to_binary,[{frame_rec,<<217
> bytes>>,1226192940,0,active,ok,0,undefined,0,<<33
> bytes>>}]},{rabbit_binary_generator,create_frame,3},{rabbit_binary_generator,build_content_frames,5},{rabbit_binary_generator,build_simple_content_frames...

That is a fair point. May I suggest that we let the application choose
what it wants to do - adding an equivalent call command would be
trivial. What do you think?

>> Have you looked at the test that I wrote for this in the test_util
>> module - it does pretty much what you describe.
> I have looked at the test, but it does not do anything like what I was
> trying to describe :)
> What I was trying to describe was having a gen_server or gen_fsm that has a
> clause something like this;
> handle_info(#'channel.flow'{active = false}, State) ->
>     {noreply, State#state{channel_flow = false}};
> handle_info(#'channel.flow'{active = true}, State) ->
>     {noreply, State#state{channel_flow = true}};

Sure, but this is all on the receiving process' side, so whether it
handles it as a gen_server or as a plain jane receive block *should*
be irrelevant. Furthermore, the receiving process may just ignore the
notification or it may not register itself in the first place.

So I think the more important side of things is how the channel
process propagates the event that it receives from the broker.

And this is what I have done - when the amqp_channel receives the
#'channel.flow'{active = false} from the broker, it sends the
registered listener a pause atom - in the reverse it sends it a resume

Hence what you are suggesting is equivalent to what I have already done :-)

> When receiving the "down" signal, the producer can then go into an idle
> state voluntarily. I don't like the idea of the client silently throwing
> away producer messages.

See the comment above giving the app the choice between the "really
safe" call and the "really cool" cast.

> I really don't like the sound of that. I don't like silent discards in
> general.

Is this a re-transmission issue in your mail ;-)

>> The only issue I see is the added complexity involved in the
>> application - it would have to have a receive loop that matches on the
>> pause and resume atoms sent to it by the channel process.
> Not if done the way I suggested. Then it just becomes another handle_info
> clause and state change.

As indicated above, this can easily be processed by the app as a
gen_server callback, but that is down to the client. Having said that,
what you are suggesting is sensible and probably would consitute best


More information about the rabbitmq-discuss mailing list