[rabbitmq-discuss] c amqp consumer frame header issue
Alan Antonuk
alan.antonuk at gmail.com
Sun Jun 30 08:14:05 BST 2013
When a message is delivered over AMQP to a client with a consumer the
client will receive the following frames this order:
- a method frame containing the basic.deliver method
- a header frame containing the properties structure
- one or more body frames containing the message data
Note that this happens in the context of a single channel. AMQP multiplexes
multiple channels over a single connection. My understanding of AMQP is
that as a client, once you receive a basic.deliver method on a channel, the
next 2 or more frames will be a header frame and one or more body frames,
with no other frames in between unless there's some kind of channel
exception, in which case you'll receive a channel.close and the message is
aborted being received.
Why does this matter? Because amqp_simple_wait_frame() returns the next
frame *for the connection*. Thus if you have more than one consumer running
on different channels, the logic for reading messages from the broker
becomes a little more involved in that you need to separate out frames
based on their channel, then construct the messages from there.
(Unfortunately the rabbitmq-c examples don't cover this scenario)
This is why I asked for a bit more context as to what your program was
doing outside of the code you provided. (I'm still don't have all the
details necessary to understand what you're doing e.g., describe what
amqp_* commands you're issuing before you get to the message reader loop).
-Alan
On Sat, Jun 29, 2013 at 6:04 PM, cogitate <monish.unni at gmail.com> wrote:
> hi alan:
> right now, i am not running the program with those conditions =>
> [1] when no routing_key is found and the message is sent to bounded AE and
> a
> consumer of AE tries to get the message.
> [2] when due message-ttl the message ends up at DLX consumer.
>
> however, just for your reference, it seems this is what pika is doing (see
> below), and it just seems that it doesn't expect the frames to be in any
> certain order like the C consumer (if , else if , else if...)
>
> i'll however, try and reproduce the conditions in the meanwhile.
>
>
> def decode_frame(data_in):
> """
> Receives raw socket data and attempts to turn it into a frame.
> Returns bytes used to make the frame and the frame
> """
> # Look to see if it's a protocol header frame
> try:
> if data_in[0:4] == 'AMQP':
> major, minor, revision = struct.unpack_from('BBB', data_in, 5)
> return 8, ProtocolHeader(major, minor, revision)
> except IndexError:
> # We didn't get a full frame
> return 0, None
> except struct.error:
> # We didn't get a full frame
> return 0, None
>
> # Get the Frame Type, Channel Number and Frame Size
> try:
> frame_type, channel_number, frame_size = \
> struct.unpack('>BHL', data_in[0:7])
> except struct.error:
> # We didn't get a full frame
> return 0, None
>
> # Get the frame data
> frame_end = spec.FRAME_HEADER_SIZE +\
> frame_size +\
> spec.FRAME_END_SIZE
>
> # We don't have all of the frame yet
> if frame_end > len(data_in):
> return 0, None
>
> # The Frame termination chr is wrong
> if data_in[frame_end - 1] != chr(spec.FRAME_END):
> raise exceptions.InvalidFrameError("Invalid FRAME_END marker")
>
> # Get the raw frame data
> frame_data = data_in[spec.FRAME_HEADER_SIZE:frame_end - 1]
>
> if frame_type == spec.FRAME_METHOD:
>
> # Get the Method ID from the frame data
> method_id = struct.unpack_from('>I', frame_data)[0]
>
> # Get a Method object for this method_id
> method = spec.methods[method_id]()
>
> # Decode the content
> method.decode(frame_data, 4)
>
> # Return the amount of data consumed and the Method object
> return frame_end, Method(channel_number, method)
>
> elif frame_type == spec.FRAME_HEADER:
>
> # Return the header class and body size
> class_id, weight, body_size = struct.unpack_from('>HHQ',
> frame_data)
>
> # Get the Properties type
> properties = spec.props[class_id]()
>
> # Decode the properties
> out = properties.decode(frame_data[12:])
>
> # Return a Header frame
> return frame_end, Header(channel_number, body_size, properties)
>
> elif frame_type == spec.FRAME_BODY:
>
> # Return the amount of data consumed and the Body frame w/ data
> return frame_end, Body(channel_number, frame_data)
>
> elif frame_type == spec.FRAME_HEARTBEAT:
>
> # Return the amount of data and a Heartbeat frame
> return frame_end, Heartbeat()
>
> raise exceptions.InvalidFrameError("Unknown frame type: %i" %
> frame_type)
>
>
>
>
> --
> View this message in context:
> http://rabbitmq.1065348.n5.nabble.com/c-amqp-consumer-frame-header-issue-tp27685p27692.html
> Sent from the RabbitMQ mailing list archive at Nabble.com.
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130630/bb388481/attachment.htm>
More information about the rabbitmq-discuss
mailing list