<div dir="ltr">When a message is delivered over AMQP to a client with a consumer the client will receive the following frames this order:<div><br></div><div>- a method frame containing the basic.deliver method</div><div>
- a header frame containing the properties structure</div><div>- one or more body frames containing the message data</div><div><br></div><div>Note that this happens in the context of a single channel. AMQP multiplexes multiple channels over a single connection. My understanding of AMQP is that as a client, once you receive a basic.deliver method on a channel, the next 2 or more frames will be a header frame and one or more body frames, with no other frames in between unless there's some kind of channel exception, in which case you'll receive a channel.close and the message is aborted being received.</div>
<div><br></div><div>Why does this matter? Because amqp_simple_wait_frame() returns the next frame *for the connection*. Thus if you have more than one consumer running on different channels, the logic for reading messages from the broker becomes a little more involved in that you need to separate out frames based on their channel, then construct the messages from there. (Unfortunately the rabbitmq-c examples don't cover this scenario)</div>
<div><br></div><div>This is why I asked for a bit more context as to what your program was doing outside of the code you provided. (I'm still don't have all the details necessary to understand what you're doing e.g., describe what amqp_* commands you're issuing before you get to the message reader loop).</div>
<div><br></div>
<div>-Alan</div></div><div class="gmail_extra"><br><br><div class="gmail_quote">On Sat, Jun 29, 2013 at 6:04 PM, cogitate <span dir="ltr"><<a href="mailto:monish.unni@gmail.com" target="_blank">monish.unni@gmail.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">hi alan:<br>
right now, i am not running the program with those conditions =><br>
[1] when no routing_key is found and the message is sent to bounded AE and a<br>
consumer of AE tries to get the message.<br>
[2] when due message-ttl the message ends up at DLX consumer.<br>
<br>
however, just for your reference, it seems this is what pika is doing (see<br>
below), and it just seems that it doesn't expect the frames to be in any<br>
certain order like the C consumer (if , else if , else if...)<br>
<br>
i'll however, try and reproduce the conditions in the meanwhile.<br>
<br>
<br>
def decode_frame(data_in):<br>
"""<br>
Receives raw socket data and attempts to turn it into a frame.<br>
Returns bytes used to make the frame and the frame<br>
"""<br>
# Look to see if it's a protocol header frame<br>
try:<br>
if data_in[0:4] == 'AMQP':<br>
major, minor, revision = struct.unpack_from('BBB', data_in, 5)<br>
return 8, ProtocolHeader(major, minor, revision)<br>
except IndexError:<br>
# We didn't get a full frame<br>
return 0, None<br>
except struct.error:<br>
# We didn't get a full frame<br>
return 0, None<br>
<br>
# Get the Frame Type, Channel Number and Frame Size<br>
try:<br>
frame_type, channel_number, frame_size = \<br>
struct.unpack('>BHL', data_in[0:7])<br>
except struct.error:<br>
# We didn't get a full frame<br>
return 0, None<br>
<br>
# Get the frame data<br>
frame_end = spec.FRAME_HEADER_SIZE +\<br>
frame_size +\<br>
spec.FRAME_END_SIZE<br>
<br>
# We don't have all of the frame yet<br>
if frame_end > len(data_in):<br>
return 0, None<br>
<br>
# The Frame termination chr is wrong<br>
if data_in[frame_end - 1] != chr(spec.FRAME_END):<br>
raise exceptions.InvalidFrameError("Invalid FRAME_END marker")<br>
<br>
# Get the raw frame data<br>
frame_data = data_in[spec.FRAME_HEADER_SIZE:frame_end - 1]<br>
<br>
if frame_type == spec.FRAME_METHOD:<br>
<br>
# Get the Method ID from the frame data<br>
method_id = struct.unpack_from('>I', frame_data)[0]<br>
<br>
# Get a Method object for this method_id<br>
method = spec.methods[method_id]()<br>
<br>
# Decode the content<br>
method.decode(frame_data, 4)<br>
<br>
# Return the amount of data consumed and the Method object<br>
return frame_end, Method(channel_number, method)<br>
<br>
elif frame_type == spec.FRAME_HEADER:<br>
<br>
# Return the header class and body size<br>
class_id, weight, body_size = struct.unpack_from('>HHQ', frame_data)<br>
<br>
# Get the Properties type<br>
properties = spec.props[class_id]()<br>
<br>
# Decode the properties<br>
out = properties.decode(frame_data[12:])<br>
<br>
# Return a Header frame<br>
return frame_end, Header(channel_number, body_size, properties)<br>
<br>
elif frame_type == spec.FRAME_BODY:<br>
<br>
# Return the amount of data consumed and the Body frame w/ data<br>
return frame_end, Body(channel_number, frame_data)<br>
<br>
elif frame_type == spec.FRAME_HEARTBEAT:<br>
<br>
# Return the amount of data and a Heartbeat frame<br>
return frame_end, Heartbeat()<br>
<br>
raise exceptions.InvalidFrameError("Unknown frame type: %i" %<br>
frame_type)<br>
<br>
<br>
<br>
<br>
--<br>
View this message in context: <a href="http://rabbitmq.1065348.n5.nabble.com/c-amqp-consumer-frame-header-issue-tp27685p27692.html" target="_blank">http://rabbitmq.1065348.n5.nabble.com/c-amqp-consumer-frame-header-issue-tp27685p27692.html</a><br>
<div class="HOEnZb"><div class="h5">Sent from the RabbitMQ mailing list archive at Nabble.com.<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</div></div></blockquote></div><br></div>