[rabbitmq-discuss] c amqp consumer frame header issue

cogitate monish.unni at gmail.com
Sun Jun 30 02:04:55 BST 2013


hi alan:
right now, i am not running the program with those conditions =>
[1] when no routing_key is found and the message is sent to bounded AE and a
consumer of AE tries to get the message.
[2] when due message-ttl the message ends up at DLX consumer.

however, just for your reference, it seems this is what pika is doing (see
below), and it just seems that it doesn't expect the frames to be in any
certain order like the C consumer (if , else if , else if...)

i'll however, try and reproduce the conditions in the meanwhile.


def decode_frame(data_in):
    """
    Receives raw socket data and attempts to turn it into a frame.
    Returns bytes used to make the frame and the frame
    """
    # Look to see if it's a protocol header frame
    try:
        if data_in[0:4] == 'AMQP':
            major, minor, revision = struct.unpack_from('BBB', data_in, 5)
            return 8, ProtocolHeader(major, minor, revision)
    except IndexError:
        # We didn't get a full frame
        return 0, None
    except struct.error:
        # We didn't get a full frame
        return 0, None

    # Get the Frame Type, Channel Number and Frame Size
    try:
        frame_type, channel_number, frame_size = \
            struct.unpack('>BHL', data_in[0:7])
    except struct.error:
        # We didn't get a full frame
        return 0, None

    # Get the frame data
    frame_end = spec.FRAME_HEADER_SIZE +\
                frame_size +\
                spec.FRAME_END_SIZE

    # We don't have all of the frame yet
    if frame_end > len(data_in):
        return 0, None

    # The Frame termination chr is wrong
    if data_in[frame_end - 1] != chr(spec.FRAME_END):
        raise exceptions.InvalidFrameError("Invalid FRAME_END marker")

    # Get the raw frame data
    frame_data = data_in[spec.FRAME_HEADER_SIZE:frame_end - 1]

    if frame_type == spec.FRAME_METHOD:

        # Get the Method ID from the frame data
        method_id = struct.unpack_from('>I', frame_data)[0]

        # Get a Method object for this method_id
        method = spec.methods[method_id]()

        # Decode the content
        method.decode(frame_data, 4)

        # Return the amount of data consumed and the Method object
        return frame_end, Method(channel_number, method)

    elif frame_type == spec.FRAME_HEADER:

        # Return the header class and body size
        class_id, weight, body_size = struct.unpack_from('>HHQ', frame_data)

        # Get the Properties type
        properties = spec.props[class_id]()

        # Decode the properties
        out = properties.decode(frame_data[12:])

        # Return a Header frame
        return frame_end, Header(channel_number, body_size, properties)

    elif frame_type == spec.FRAME_BODY:

        # Return the amount of data consumed and the Body frame w/ data
        return frame_end, Body(channel_number, frame_data)

    elif frame_type == spec.FRAME_HEARTBEAT:

        # Return the amount of data and a Heartbeat frame
        return frame_end, Heartbeat()

    raise exceptions.InvalidFrameError("Unknown frame type: %i" %
frame_type)




--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/c-amqp-consumer-frame-header-issue-tp27685p27692.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list